Jefffrey commented on code in PR #8716:
URL: https://github.com/apache/arrow-rs/pull/8716#discussion_r3408835542
##########
arrow-cast/src/cast/run_array.rs:
##########
@@ -169,3 +171,396 @@ pub(crate) fn cast_to_run_end_encoded<K: RunEndIndexType>(
let run_array = RunArray::<K>::try_new(&run_ends_array,
values_array.as_ref())?;
Ok(Arc::new(run_array))
}
+
+fn compute_run_boundaries(array: &ArrayRef) -> (Vec<usize>, Vec<usize>) {
+ if array.is_empty() {
+ return (Vec::new(), Vec::new());
+ }
+
+ use arrow_schema::DataType::*;
+
+ let array = array.as_ref();
+ downcast_primitive_array! {
+ array => runs_for_primitive(array),
+ Null => (vec![array.len()], vec![0]),
+ Boolean => runs_for_boolean(array.as_boolean()),
+ Utf8 => runs_for_string(array.as_string::<i32>()),
+ LargeUtf8 => runs_for_string(array.as_string::<i64>()),
+ Binary => runs_for_binary(array.as_binary::<i32>()),
+ LargeBinary => runs_for_binary(array.as_binary::<i64>()),
+ FixedSizeBinary(_) =>
runs_for_fixed_size_binary(array.as_fixed_size_binary()),
+ Dictionary(key_type, _) => match key_type.as_ref() {
+ Int8 => runs_for_dictionary::<Int8Type>(array.as_dictionary()),
+ Int16 => runs_for_dictionary::<Int16Type>(array.as_dictionary()),
+ Int32 => runs_for_dictionary::<Int32Type>(array.as_dictionary()),
+ Int64 => runs_for_dictionary::<Int64Type>(array.as_dictionary()),
+ UInt8 => runs_for_dictionary::<UInt8Type>(array.as_dictionary()),
+ UInt16 => runs_for_dictionary::<UInt16Type>(array.as_dictionary()),
+ UInt32 => runs_for_dictionary::<UInt32Type>(array.as_dictionary()),
+ UInt64 => runs_for_dictionary::<UInt64Type>(array.as_dictionary()),
+ _ => runs_generic(array),
+ },
+ _ => runs_generic(array),
+ }
+}
+
+fn runs_for_boolean(array: &BooleanArray) -> (Vec<usize>, Vec<usize>) {
+ let len = array.len();
+ if let Some(runs) = trivial_runs(len) {
+ return runs;
+ }
+
+ let mut run_boundaries = Vec::with_capacity(len / 64 + 2);
+ let mut current_valid = array.is_valid(0);
+ let mut current_value = if current_valid { array.value(0) } else { false };
+
+ for idx in 1..len {
+ // Treat a change in validity the same as a change in value so null
boundaries are recorded.
+ let valid = array.is_valid(idx);
+ let mut boundary = false;
+ if current_valid && valid {
+ let value = array.value(idx);
+ if value != current_value {
+ current_value = value;
+ boundary = true;
+ }
+ } else if current_valid != valid {
+ boundary = true;
+ if valid {
+ current_value = array.value(idx);
+ }
+ }
+
+ if boundary {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ }
+ current_valid = valid;
+ }
+
+ finalize_runs(run_boundaries, len)
+}
+
+fn runs_for_primitive<T: ArrowPrimitiveType>(
+ array: &PrimitiveArray<T>,
+) -> (Vec<usize>, Vec<usize>) {
+ let len = array.len();
+ if let Some(runs) = trivial_runs(len) {
+ return runs;
+ }
+
+ let values = array.values();
+ let mut run_boundaries = Vec::with_capacity(len / 64 + 2);
+
+ if array.null_count() == 0 {
+ let mut current = unsafe { *values.get_unchecked(0) };
+ let mut idx = 1;
+ while idx < len {
+ // Attempt to advance in 16-byte chunks before falling back to
scalar comparison.
+ let boundary = scan_run_end::<T>(values, current, idx);
+ if boundary == len {
+ break;
+ }
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(boundary);
+ current = unsafe { *values.get_unchecked(boundary) };
+ idx = boundary + 1;
+ }
+ return finalize_runs(run_boundaries, len);
+ }
+
+ let nulls = array
+ .nulls()
+ .expect("null_count > 0 implies a null buffer is present");
+ let mut current_valid = nulls.is_valid(0);
+ let mut current_value = unsafe { *values.get_unchecked(0) };
+ for idx in 1..len {
+ let valid = nulls.is_valid(idx);
+ let mut boundary = false;
+ if current_valid && valid {
+ let value = unsafe { *values.get_unchecked(idx) };
+ if value != current_value {
+ current_value = value;
+ boundary = true;
+ }
+ } else if current_valid != valid {
+ boundary = true;
+ if valid {
+ current_value = unsafe { *values.get_unchecked(idx) };
+ }
+ }
+ if boundary {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ }
+ current_valid = valid;
+ }
+ finalize_runs(run_boundaries, len)
+}
+
+fn runs_for_binary<O: OffsetSizeTrait>(array: &GenericBinaryArray<O>) ->
(Vec<usize>, Vec<usize>) {
+ let mut to_usize = |v: O| v.as_usize();
+ runs_for_binary_like(
+ array.len(),
+ array.null_count(),
+ array.value_offsets(),
+ array.value_data(),
+ |idx| array.is_valid(idx),
+ &mut to_usize,
+ )
+}
+
+fn runs_for_binary_like<T: Copy>(
+ len: usize,
+ null_count: usize,
+ offsets: &[T],
+ values: &[u8],
+ mut is_valid: impl FnMut(usize) -> bool,
+ to_usize: &mut impl FnMut(T) -> usize,
+) -> (Vec<usize>, Vec<usize>) {
+ if let Some(runs) = trivial_runs(len) {
+ return runs;
+ }
+
+ let mut run_boundaries = Vec::with_capacity(len / 64 + 2);
+
+ if null_count == 0 {
+ let mut current_start = to_usize(offsets[0]);
+ let mut current_end = to_usize(offsets[1]);
+ for idx in 1..len {
+ let start = to_usize(offsets[idx]);
+ let end = to_usize(offsets[idx + 1]);
+ // Any difference in byte length or payload means a new run.
+ if (end - start) != (current_end - current_start)
+ || values[start..end] != values[current_start..current_end]
+ {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ current_start = start;
+ current_end = end;
+ }
+ }
+ } else {
+ let mut current_valid = is_valid(0);
+ let mut current_range = (to_usize(offsets[0]), to_usize(offsets[1]));
+ for idx in 1..len {
+ let valid = is_valid(idx);
+ let mut boundary = false;
+ if current_valid && valid {
+ let start = to_usize(offsets[idx]);
+ let end = to_usize(offsets[idx + 1]);
+ let (current_start, current_end) = current_range;
+ // Keep reusing the current byte-range as long as both
validity and payload match.
+ if (end - start) != (current_end - current_start)
+ || values[start..end] != values[current_start..current_end]
+ {
+ boundary = true;
+ current_range = (start, end);
+ }
+ } else if current_valid != valid {
+ boundary = true;
+ if valid {
+ current_range = (to_usize(offsets[idx]),
to_usize(offsets[idx + 1]));
+ }
+ }
+ if boundary {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ }
+ current_valid = valid;
+ }
+ }
+
+ finalize_runs(run_boundaries, len)
+}
+
+fn runs_for_string<O: OffsetSizeTrait>(array: &GenericStringArray<O>) ->
(Vec<usize>, Vec<usize>) {
+ let mut to_usize = |v: O| v.as_usize();
+ runs_for_binary_like(
+ array.len(),
+ array.null_count(),
+ array.value_offsets(),
+ array.value_data(),
+ |idx| array.is_valid(idx),
+ &mut to_usize,
+ )
+}
+
+fn runs_for_fixed_size_binary(array: &FixedSizeBinaryArray) -> (Vec<usize>,
Vec<usize>) {
+ let len = array.len();
+ if let Some(runs) = trivial_runs(len) {
+ return runs;
+ }
+
+ let width = array.value_length() as usize;
+ let values = array.value_data();
+ let mut run_boundaries = Vec::with_capacity(len / 64 + 2);
+ if array.null_count() == 0 {
+ let mut current_slice = &values[0..width];
+ for idx in 1..len {
+ let start = idx * width;
+ let slice = &values[start..start + width];
+ // Width is constant, so a simple byte slice comparison suffices.
+ if slice != current_slice {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ current_slice = slice;
+ }
+ }
+ } else {
+ let nulls = array
+ .nulls()
+ .expect("null_count > 0 implies a null buffer is present");
+ let mut current_valid = nulls.is_valid(0);
+ let mut current_slice = &values[0..width];
+ for idx in 1..len {
+ let valid = nulls.is_valid(idx);
+ let mut boundary = false;
+ if current_valid && valid {
+ let start = idx * width;
+ let slice = &values[start..start + width];
+ if slice != current_slice {
+ boundary = true;
+ current_slice = slice;
+ }
+ } else if current_valid != valid {
+ boundary = true;
+ if valid {
+ let start = idx * width;
+ current_slice = &values[start..start + width];
+ }
+ }
+ if boundary {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ }
+ current_valid = valid;
+ }
+ }
+
+ finalize_runs(run_boundaries, len)
+}
+
+fn runs_for_dictionary<K: ArrowDictionaryKeyType>(
+ array: &DictionaryArray<K>,
+) -> (Vec<usize>, Vec<usize>) {
+ runs_for_primitive(array.keys())
+}
+
+fn runs_generic(array: &dyn Array) -> (Vec<usize>, Vec<usize>) {
+ let len = array.len();
+ if let Some(runs) = trivial_runs(len) {
+ return runs;
+ }
+
+ let mut run_boundaries = Vec::with_capacity(len / 64 + 2);
+ let mut current_data = array.slice(0, 1).to_data();
+ for idx in 1..len {
+ let next_data = array.slice(idx, 1).to_data();
+ // Fallback for exotic types: compare `ArrayData` views directly.
+ if current_data != next_data {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ current_data = next_data;
+ }
+ }
+
+ finalize_runs(run_boundaries, len)
+}
+
+fn trivial_runs(len: usize) -> Option<(Vec<usize>, Vec<usize>)> {
+ match len {
+ 0 => Some((Vec::new(), Vec::new())),
+ 1 => Some((vec![1], vec![0])),
+ _ => None,
+ }
+}
+
+#[inline]
+fn ensure_capacity(vec: &mut Vec<usize>, total_len: usize) {
+ if vec.len() == vec.capacity() {
+ let remaining = total_len.saturating_sub(vec.len());
+ vec.reserve(remaining.max(1));
+ }
+}
Review Comment:
one thing that bugs me about this function is that `total_len` is always
constant, yet this function is called inside the loop; so the only thing
changing is the capacity/len of the input vector, and it either reserves up to
`total_len` once then subsequently keeps reserving just 1 (well I don't think
this case can actually happen)
##########
arrow-cast/src/cast/run_array.rs:
##########
@@ -169,3 +171,396 @@ pub(crate) fn cast_to_run_end_encoded<K: RunEndIndexType>(
let run_array = RunArray::<K>::try_new(&run_ends_array,
values_array.as_ref())?;
Ok(Arc::new(run_array))
}
+
+fn compute_run_boundaries(array: &ArrayRef) -> (Vec<usize>, Vec<usize>) {
+ if array.is_empty() {
+ return (Vec::new(), Vec::new());
+ }
Review Comment:
```suggestion
if let Some(runs) = trivial_runs(array.len()) {
return runs;
}
```
hoisting up the call to `trivial_runs()` out of each helper (e.g. in
`runs_for_primitive`, `runs_for_boolean`)
##########
arrow-cast/src/cast/run_array.rs:
##########
@@ -169,3 +171,396 @@ pub(crate) fn cast_to_run_end_encoded<K: RunEndIndexType>(
let run_array = RunArray::<K>::try_new(&run_ends_array,
values_array.as_ref())?;
Ok(Arc::new(run_array))
}
+
+fn compute_run_boundaries(array: &ArrayRef) -> (Vec<usize>, Vec<usize>) {
+ if array.is_empty() {
+ return (Vec::new(), Vec::new());
+ }
+
+ use arrow_schema::DataType::*;
+
+ let array = array.as_ref();
+ downcast_primitive_array! {
+ array => runs_for_primitive(array),
+ Null => (vec![array.len()], vec![0]),
+ Boolean => runs_for_boolean(array.as_boolean()),
+ Utf8 => runs_for_string(array.as_string::<i32>()),
+ LargeUtf8 => runs_for_string(array.as_string::<i64>()),
+ Binary => runs_for_binary(array.as_binary::<i32>()),
+ LargeBinary => runs_for_binary(array.as_binary::<i64>()),
+ FixedSizeBinary(_) =>
runs_for_fixed_size_binary(array.as_fixed_size_binary()),
+ Dictionary(key_type, _) => match key_type.as_ref() {
+ Int8 => runs_for_dictionary::<Int8Type>(array.as_dictionary()),
+ Int16 => runs_for_dictionary::<Int16Type>(array.as_dictionary()),
+ Int32 => runs_for_dictionary::<Int32Type>(array.as_dictionary()),
+ Int64 => runs_for_dictionary::<Int64Type>(array.as_dictionary()),
+ UInt8 => runs_for_dictionary::<UInt8Type>(array.as_dictionary()),
+ UInt16 => runs_for_dictionary::<UInt16Type>(array.as_dictionary()),
+ UInt32 => runs_for_dictionary::<UInt32Type>(array.as_dictionary()),
+ UInt64 => runs_for_dictionary::<UInt64Type>(array.as_dictionary()),
+ _ => runs_generic(array),
+ },
+ _ => runs_generic(array),
+ }
+}
+
+fn runs_for_boolean(array: &BooleanArray) -> (Vec<usize>, Vec<usize>) {
+ let len = array.len();
+ if let Some(runs) = trivial_runs(len) {
+ return runs;
+ }
+
+ let mut run_boundaries = Vec::with_capacity(len / 64 + 2);
+ let mut current_valid = array.is_valid(0);
+ let mut current_value = if current_valid { array.value(0) } else { false };
+
+ for idx in 1..len {
+ // Treat a change in validity the same as a change in value so null
boundaries are recorded.
+ let valid = array.is_valid(idx);
+ let mut boundary = false;
+ if current_valid && valid {
+ let value = array.value(idx);
+ if value != current_value {
+ current_value = value;
+ boundary = true;
+ }
+ } else if current_valid != valid {
+ boundary = true;
+ if valid {
+ current_value = array.value(idx);
+ }
+ }
+
+ if boundary {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ }
+ current_valid = valid;
+ }
+
+ finalize_runs(run_boundaries, len)
+}
+
+fn runs_for_primitive<T: ArrowPrimitiveType>(
+ array: &PrimitiveArray<T>,
+) -> (Vec<usize>, Vec<usize>) {
+ let len = array.len();
+ if let Some(runs) = trivial_runs(len) {
+ return runs;
+ }
+
+ let values = array.values();
+ let mut run_boundaries = Vec::with_capacity(len / 64 + 2);
+
+ if array.null_count() == 0 {
+ let mut current = unsafe { *values.get_unchecked(0) };
+ let mut idx = 1;
+ while idx < len {
+ // Attempt to advance in 16-byte chunks before falling back to
scalar comparison.
+ let boundary = scan_run_end::<T>(values, current, idx);
+ if boundary == len {
+ break;
+ }
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(boundary);
+ current = unsafe { *values.get_unchecked(boundary) };
+ idx = boundary + 1;
+ }
+ return finalize_runs(run_boundaries, len);
+ }
+
+ let nulls = array
+ .nulls()
+ .expect("null_count > 0 implies a null buffer is present");
+ let mut current_valid = nulls.is_valid(0);
+ let mut current_value = unsafe { *values.get_unchecked(0) };
+ for idx in 1..len {
+ let valid = nulls.is_valid(idx);
+ let mut boundary = false;
+ if current_valid && valid {
+ let value = unsafe { *values.get_unchecked(idx) };
+ if value != current_value {
+ current_value = value;
+ boundary = true;
+ }
+ } else if current_valid != valid {
+ boundary = true;
+ if valid {
+ current_value = unsafe { *values.get_unchecked(idx) };
+ }
+ }
+ if boundary {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ }
+ current_valid = valid;
+ }
+ finalize_runs(run_boundaries, len)
+}
+
+fn runs_for_binary<O: OffsetSizeTrait>(array: &GenericBinaryArray<O>) ->
(Vec<usize>, Vec<usize>) {
+ let mut to_usize = |v: O| v.as_usize();
+ runs_for_binary_like(
+ array.len(),
+ array.null_count(),
+ array.value_offsets(),
+ array.value_data(),
+ |idx| array.is_valid(idx),
+ &mut to_usize,
+ )
+}
+
+fn runs_for_binary_like<T: Copy>(
+ len: usize,
+ null_count: usize,
+ offsets: &[T],
+ values: &[u8],
+ mut is_valid: impl FnMut(usize) -> bool,
+ to_usize: &mut impl FnMut(T) -> usize,
+) -> (Vec<usize>, Vec<usize>) {
+ if let Some(runs) = trivial_runs(len) {
+ return runs;
+ }
+
+ let mut run_boundaries = Vec::with_capacity(len / 64 + 2);
+
+ if null_count == 0 {
+ let mut current_start = to_usize(offsets[0]);
+ let mut current_end = to_usize(offsets[1]);
+ for idx in 1..len {
+ let start = to_usize(offsets[idx]);
+ let end = to_usize(offsets[idx + 1]);
+ // Any difference in byte length or payload means a new run.
+ if (end - start) != (current_end - current_start)
+ || values[start..end] != values[current_start..current_end]
+ {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ current_start = start;
+ current_end = end;
+ }
+ }
+ } else {
+ let mut current_valid = is_valid(0);
+ let mut current_range = (to_usize(offsets[0]), to_usize(offsets[1]));
+ for idx in 1..len {
+ let valid = is_valid(idx);
+ let mut boundary = false;
+ if current_valid && valid {
+ let start = to_usize(offsets[idx]);
+ let end = to_usize(offsets[idx + 1]);
+ let (current_start, current_end) = current_range;
+ // Keep reusing the current byte-range as long as both
validity and payload match.
+ if (end - start) != (current_end - current_start)
+ || values[start..end] != values[current_start..current_end]
+ {
+ boundary = true;
+ current_range = (start, end);
+ }
+ } else if current_valid != valid {
+ boundary = true;
+ if valid {
+ current_range = (to_usize(offsets[idx]),
to_usize(offsets[idx + 1]));
+ }
+ }
+ if boundary {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ }
+ current_valid = valid;
+ }
+ }
+
+ finalize_runs(run_boundaries, len)
+}
+
+fn runs_for_string<O: OffsetSizeTrait>(array: &GenericStringArray<O>) ->
(Vec<usize>, Vec<usize>) {
+ let mut to_usize = |v: O| v.as_usize();
+ runs_for_binary_like(
+ array.len(),
+ array.null_count(),
+ array.value_offsets(),
+ array.value_data(),
+ |idx| array.is_valid(idx),
+ &mut to_usize,
+ )
+}
+
+fn runs_for_fixed_size_binary(array: &FixedSizeBinaryArray) -> (Vec<usize>,
Vec<usize>) {
+ let len = array.len();
+ if let Some(runs) = trivial_runs(len) {
+ return runs;
+ }
+
+ let width = array.value_length() as usize;
+ let values = array.value_data();
+ let mut run_boundaries = Vec::with_capacity(len / 64 + 2);
+ if array.null_count() == 0 {
+ let mut current_slice = &values[0..width];
+ for idx in 1..len {
+ let start = idx * width;
+ let slice = &values[start..start + width];
+ // Width is constant, so a simple byte slice comparison suffices.
+ if slice != current_slice {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ current_slice = slice;
+ }
+ }
+ } else {
+ let nulls = array
+ .nulls()
+ .expect("null_count > 0 implies a null buffer is present");
+ let mut current_valid = nulls.is_valid(0);
+ let mut current_slice = &values[0..width];
+ for idx in 1..len {
+ let valid = nulls.is_valid(idx);
+ let mut boundary = false;
+ if current_valid && valid {
+ let start = idx * width;
+ let slice = &values[start..start + width];
+ if slice != current_slice {
+ boundary = true;
+ current_slice = slice;
+ }
+ } else if current_valid != valid {
+ boundary = true;
+ if valid {
+ let start = idx * width;
+ current_slice = &values[start..start + width];
+ }
+ }
+ if boundary {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ }
+ current_valid = valid;
+ }
+ }
+
+ finalize_runs(run_boundaries, len)
+}
+
+fn runs_for_dictionary<K: ArrowDictionaryKeyType>(
+ array: &DictionaryArray<K>,
+) -> (Vec<usize>, Vec<usize>) {
+ runs_for_primitive(array.keys())
+}
+
+fn runs_generic(array: &dyn Array) -> (Vec<usize>, Vec<usize>) {
+ let len = array.len();
+ if let Some(runs) = trivial_runs(len) {
+ return runs;
+ }
+
+ let mut run_boundaries = Vec::with_capacity(len / 64 + 2);
+ let mut current_data = array.slice(0, 1).to_data();
+ for idx in 1..len {
+ let next_data = array.slice(idx, 1).to_data();
+ // Fallback for exotic types: compare `ArrayData` views directly.
+ if current_data != next_data {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ current_data = next_data;
+ }
+ }
+
+ finalize_runs(run_boundaries, len)
+}
+
+fn trivial_runs(len: usize) -> Option<(Vec<usize>, Vec<usize>)> {
+ match len {
+ 0 => Some((Vec::new(), Vec::new())),
+ 1 => Some((vec![1], vec![0])),
+ _ => None,
+ }
+}
+
+#[inline]
+fn ensure_capacity(vec: &mut Vec<usize>, total_len: usize) {
+ if vec.len() == vec.capacity() {
+ let remaining = total_len.saturating_sub(vec.len());
+ vec.reserve(remaining.max(1));
+ }
+}
+
+fn finalize_runs(mut run_boundaries: Vec<usize>, len: usize) -> (Vec<usize>,
Vec<usize>) {
+ let mut values_indexes = Vec::with_capacity(run_boundaries.len() + 1);
+ // Values array always pulls the first element of each run; index 0 is by
definition a run start.
+ values_indexes.push(0);
+ values_indexes.extend_from_slice(&run_boundaries);
+ run_boundaries.push(len);
+ (run_boundaries, values_indexes)
+}
+
+#[inline]
+fn scan_run_end<T: ArrowPrimitiveType>(
+ values: &[T::Native],
+ current: T::Native,
+ start: usize,
+) -> usize {
+ let element_size = std::mem::size_of::<T::Native>();
+ // Only attempt the chunked search when the element size divides evenly
into 16 bytes.
+ if element_size <= 8 && 16 % element_size == 0 {
+ let elements_per_chunk = 16 / element_size;
+ return scan_run_end_chunk::<T>(values, current, start,
elements_per_chunk, element_size);
+ }
+ scan_run_end_scalar::<T>(values, current, start)
+}
+
+#[inline]
+fn scan_run_end_chunk<T: ArrowPrimitiveType>(
+ values: &[T::Native],
+ current: T::Native,
+ start: usize,
+ elements_per_chunk: usize,
+ element_size: usize,
+) -> usize {
+ let len = values.len();
+ let mut idx = start;
+ if idx >= len {
+ return len;
+ }
+
+ let mut pattern_bytes = [0u8; 16];
+ // Safety: `T::Native` is guaranteed by `ArrowPrimitiveType` to have a
plain-old-data layout,
+ // allowing the value to be viewed as raw bytes. We copy exactly
`element_size` bytes, so the
+ // slice built from `current` stays within bounds.
+ unsafe {
+ let value_bytes =
+ std::slice::from_raw_parts(¤t as *const T::Native as *const
u8, element_size);
+ for chunk in pattern_bytes.chunks_mut(element_size) {
+ chunk.copy_from_slice(value_bytes);
+ }
+ }
+ let pattern = u128::from_ne_bytes(pattern_bytes);
+
+ while idx + elements_per_chunk <= len {
+ // SAFETY: pointer arithmetic stays within the backing slice;
unaligned reads are allowed.
+ let chunk = unsafe { (values.as_ptr().add(idx) as *const
u128).read_unaligned() };
+ if chunk != pattern {
+ for offset in 0..elements_per_chunk {
+ let value = unsafe { *values.get_unchecked(idx + offset) };
+ if value != current {
+ return idx + offset;
+ }
+ }
+ }
+ idx += elements_per_chunk;
+ }
Review Comment:
With the amount of unsafe here, it would be nice if we could ensure we run
this code through miri (if we don't already) 🤔
##########
arrow-cast/src/cast/run_array.rs:
##########
@@ -169,3 +171,396 @@ pub(crate) fn cast_to_run_end_encoded<K: RunEndIndexType>(
let run_array = RunArray::<K>::try_new(&run_ends_array,
values_array.as_ref())?;
Ok(Arc::new(run_array))
}
+
+fn compute_run_boundaries(array: &ArrayRef) -> (Vec<usize>, Vec<usize>) {
+ if array.is_empty() {
+ return (Vec::new(), Vec::new());
+ }
+
+ use arrow_schema::DataType::*;
+
+ let array = array.as_ref();
+ downcast_primitive_array! {
+ array => runs_for_primitive(array),
+ Null => (vec![array.len()], vec![0]),
+ Boolean => runs_for_boolean(array.as_boolean()),
+ Utf8 => runs_for_string(array.as_string::<i32>()),
+ LargeUtf8 => runs_for_string(array.as_string::<i64>()),
+ Binary => runs_for_binary(array.as_binary::<i32>()),
+ LargeBinary => runs_for_binary(array.as_binary::<i64>()),
+ FixedSizeBinary(_) =>
runs_for_fixed_size_binary(array.as_fixed_size_binary()),
+ Dictionary(key_type, _) => match key_type.as_ref() {
+ Int8 => runs_for_dictionary::<Int8Type>(array.as_dictionary()),
+ Int16 => runs_for_dictionary::<Int16Type>(array.as_dictionary()),
+ Int32 => runs_for_dictionary::<Int32Type>(array.as_dictionary()),
+ Int64 => runs_for_dictionary::<Int64Type>(array.as_dictionary()),
+ UInt8 => runs_for_dictionary::<UInt8Type>(array.as_dictionary()),
+ UInt16 => runs_for_dictionary::<UInt16Type>(array.as_dictionary()),
+ UInt32 => runs_for_dictionary::<UInt32Type>(array.as_dictionary()),
+ UInt64 => runs_for_dictionary::<UInt64Type>(array.as_dictionary()),
+ _ => runs_generic(array),
+ },
+ _ => runs_generic(array),
+ }
+}
+
+fn runs_for_boolean(array: &BooleanArray) -> (Vec<usize>, Vec<usize>) {
+ let len = array.len();
+ if let Some(runs) = trivial_runs(len) {
+ return runs;
+ }
+
+ let mut run_boundaries = Vec::with_capacity(len / 64 + 2);
+ let mut current_valid = array.is_valid(0);
+ let mut current_value = if current_valid { array.value(0) } else { false };
+
+ for idx in 1..len {
+ // Treat a change in validity the same as a change in value so null
boundaries are recorded.
+ let valid = array.is_valid(idx);
+ let mut boundary = false;
+ if current_valid && valid {
+ let value = array.value(idx);
+ if value != current_value {
+ current_value = value;
+ boundary = true;
+ }
+ } else if current_valid != valid {
+ boundary = true;
+ if valid {
+ current_value = array.value(idx);
+ }
+ }
+
+ if boundary {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ }
+ current_valid = valid;
+ }
+
+ finalize_runs(run_boundaries, len)
+}
+
+fn runs_for_primitive<T: ArrowPrimitiveType>(
+ array: &PrimitiveArray<T>,
+) -> (Vec<usize>, Vec<usize>) {
+ let len = array.len();
+ if let Some(runs) = trivial_runs(len) {
+ return runs;
+ }
+
+ let values = array.values();
+ let mut run_boundaries = Vec::with_capacity(len / 64 + 2);
+
+ if array.null_count() == 0 {
+ let mut current = unsafe { *values.get_unchecked(0) };
+ let mut idx = 1;
+ while idx < len {
+ // Attempt to advance in 16-byte chunks before falling back to
scalar comparison.
+ let boundary = scan_run_end::<T>(values, current, idx);
+ if boundary == len {
+ break;
+ }
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(boundary);
+ current = unsafe { *values.get_unchecked(boundary) };
+ idx = boundary + 1;
+ }
+ return finalize_runs(run_boundaries, len);
+ }
+
+ let nulls = array
+ .nulls()
+ .expect("null_count > 0 implies a null buffer is present");
+ let mut current_valid = nulls.is_valid(0);
+ let mut current_value = unsafe { *values.get_unchecked(0) };
+ for idx in 1..len {
+ let valid = nulls.is_valid(idx);
+ let mut boundary = false;
+ if current_valid && valid {
+ let value = unsafe { *values.get_unchecked(idx) };
+ if value != current_value {
+ current_value = value;
+ boundary = true;
+ }
+ } else if current_valid != valid {
+ boundary = true;
+ if valid {
+ current_value = unsafe { *values.get_unchecked(idx) };
+ }
+ }
+ if boundary {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ }
+ current_valid = valid;
+ }
+ finalize_runs(run_boundaries, len)
+}
+
+fn runs_for_binary<O: OffsetSizeTrait>(array: &GenericBinaryArray<O>) ->
(Vec<usize>, Vec<usize>) {
+ let mut to_usize = |v: O| v.as_usize();
+ runs_for_binary_like(
+ array.len(),
+ array.null_count(),
+ array.value_offsets(),
+ array.value_data(),
+ |idx| array.is_valid(idx),
+ &mut to_usize,
+ )
+}
+
+fn runs_for_binary_like<T: Copy>(
+ len: usize,
+ null_count: usize,
+ offsets: &[T],
+ values: &[u8],
+ mut is_valid: impl FnMut(usize) -> bool,
+ to_usize: &mut impl FnMut(T) -> usize,
+) -> (Vec<usize>, Vec<usize>) {
+ if let Some(runs) = trivial_runs(len) {
+ return runs;
+ }
+
+ let mut run_boundaries = Vec::with_capacity(len / 64 + 2);
+
+ if null_count == 0 {
+ let mut current_start = to_usize(offsets[0]);
+ let mut current_end = to_usize(offsets[1]);
+ for idx in 1..len {
+ let start = to_usize(offsets[idx]);
+ let end = to_usize(offsets[idx + 1]);
+ // Any difference in byte length or payload means a new run.
+ if (end - start) != (current_end - current_start)
+ || values[start..end] != values[current_start..current_end]
+ {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ current_start = start;
+ current_end = end;
+ }
+ }
+ } else {
+ let mut current_valid = is_valid(0);
+ let mut current_range = (to_usize(offsets[0]), to_usize(offsets[1]));
+ for idx in 1..len {
+ let valid = is_valid(idx);
+ let mut boundary = false;
+ if current_valid && valid {
+ let start = to_usize(offsets[idx]);
+ let end = to_usize(offsets[idx + 1]);
+ let (current_start, current_end) = current_range;
+ // Keep reusing the current byte-range as long as both
validity and payload match.
+ if (end - start) != (current_end - current_start)
+ || values[start..end] != values[current_start..current_end]
+ {
+ boundary = true;
+ current_range = (start, end);
+ }
+ } else if current_valid != valid {
+ boundary = true;
+ if valid {
+ current_range = (to_usize(offsets[idx]),
to_usize(offsets[idx + 1]));
+ }
+ }
+ if boundary {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ }
+ current_valid = valid;
+ }
+ }
+
+ finalize_runs(run_boundaries, len)
+}
+
+fn runs_for_string<O: OffsetSizeTrait>(array: &GenericStringArray<O>) ->
(Vec<usize>, Vec<usize>) {
+ let mut to_usize = |v: O| v.as_usize();
+ runs_for_binary_like(
+ array.len(),
+ array.null_count(),
+ array.value_offsets(),
+ array.value_data(),
+ |idx| array.is_valid(idx),
+ &mut to_usize,
+ )
+}
+
+fn runs_for_fixed_size_binary(array: &FixedSizeBinaryArray) -> (Vec<usize>,
Vec<usize>) {
+ let len = array.len();
+ if let Some(runs) = trivial_runs(len) {
+ return runs;
+ }
+
+ let width = array.value_length() as usize;
+ let values = array.value_data();
+ let mut run_boundaries = Vec::with_capacity(len / 64 + 2);
+ if array.null_count() == 0 {
+ let mut current_slice = &values[0..width];
+ for idx in 1..len {
+ let start = idx * width;
+ let slice = &values[start..start + width];
+ // Width is constant, so a simple byte slice comparison suffices.
+ if slice != current_slice {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ current_slice = slice;
+ }
+ }
+ } else {
+ let nulls = array
+ .nulls()
+ .expect("null_count > 0 implies a null buffer is present");
+ let mut current_valid = nulls.is_valid(0);
+ let mut current_slice = &values[0..width];
+ for idx in 1..len {
+ let valid = nulls.is_valid(idx);
+ let mut boundary = false;
+ if current_valid && valid {
+ let start = idx * width;
+ let slice = &values[start..start + width];
+ if slice != current_slice {
+ boundary = true;
+ current_slice = slice;
+ }
+ } else if current_valid != valid {
+ boundary = true;
+ if valid {
+ let start = idx * width;
+ current_slice = &values[start..start + width];
+ }
+ }
+ if boundary {
+ ensure_capacity(&mut run_boundaries, len);
+ run_boundaries.push(idx);
+ }
+ current_valid = valid;
+ }
+ }
+
+ finalize_runs(run_boundaries, len)
+}
+
+fn runs_for_dictionary<K: ArrowDictionaryKeyType>(
+ array: &DictionaryArray<K>,
+) -> (Vec<usize>, Vec<usize>) {
+ runs_for_primitive(array.keys())
+}
+
+fn runs_generic(array: &dyn Array) -> (Vec<usize>, Vec<usize>) {
+ let len = array.len();
+ if let Some(runs) = trivial_runs(len) {
+ return runs;
+ }
+
+ let mut run_boundaries = Vec::with_capacity(len / 64 + 2);
Review Comment:
I'm seeing this `Vec::with_capacity(len / 64 + 2)` multiple times, is this
just a guesstimate? or is it based on something?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]