richox opened a new issue, #6813:
URL: https://github.com/apache/arrow-rs/issues/6813
**Is your feature request related to a problem or challenge? Please describe
what you are trying to do.**
memory prefetching is widely used in randomly accessing array items, which
is very suitable in some cases of take/interleave kernels.
i have done some fast benchmark, it shows for completely random input,
interleave with prefetching gains 2x performance with the current interleave
implement:
```
$ cargo run --release
disable prefetch time: 1.305 sec
enable prefetch time: 0.695 sec
disable prefetch time: 1.331 sec
enable prefetch time: 0.690 sec
disable prefetch time: 1.318 sec
enable prefetch time: 0.724 sec
```
benchmark code:
```rust
#![feature(core_intrinsics)]
use std::error::Error;
use std::intrinsics::prefetch_read_data;
use std::sync::Arc;
use std::time::Instant;
use arrow::array::*;
use arrow::buffer::*;
use arrow::datatypes::*;
use arrow::error::ArrowError;
fn main() -> Result<(), Box<dyn Error>> {
let generate_random_int_string = || format!("SomeTestStr={}",
rand::random::<u64>());
let mut arrays: Vec<ArrayRef> = vec![];
for _ in 0..1000 {
let str_array = (0..1000)
.map(|_| generate_random_int_string())
.collect::<Vec<_>>();
arrays.push(Arc::new(StringArray::from_iter_values(str_array)));
}
let mut random_indices = vec![];
for _ in 0..10000000 {
random_indices.push((
rand::random::<usize>() % arrays.len(),
rand::random::<usize>() % arrays[0].len(),
));
}
let random_indices_len = random_indices.len();
for i in 0..random_indices_len {
random_indices.swap(i, rand::random::<usize>() % random_indices_len);
}
fn timer<T>(name: &str, f: impl FnOnce() -> T) -> T {
let start_time = Instant::now();
let ret = f();
println!("{name} time: {:.3} sec",
start_time.elapsed().as_secs_f64());
ret
}
// warm up
assert_eq!(
&interleave_without_prefetch(&arrays, &random_indices)?,
&interleave_with_prefetch(&arrays, &random_indices)?,
);
// benchmark
for _ in 0..3 {
let batch1 = timer("disable prefetch", ||
interleave_without_prefetch(&arrays, &random_indices))?;
let batch2 = timer(" enable prefetch", ||
interleave_with_prefetch(&arrays, &random_indices))?;
assert_eq!(&batch1, &batch2);
}
Ok(())
}
fn interleave_without_prefetch(
values: &[ArrayRef],
indices: &[(usize, usize)],
) -> Result<ArrayRef, ArrowError> {
arrow::compute::interleave(&values.iter().map(|v|
v.as_ref()).collect::<Vec<_>>(), indices)
}
fn interleave_with_prefetch(
values: &[ArrayRef],
indices: &[(usize, usize)],
) -> Result<ArrayRef, ArrowError> {
struct Interleave<'a, T> {
arrays: Vec<&'a T>,
nulls: Option<NullBuffer>,
}
impl<'a, T: Array + 'static> Interleave<'a, T> {
fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) ->
Self {
let mut has_nulls = false;
let arrays: Vec<&T> = values
.iter()
.map(|x| {
has_nulls = has_nulls || x.null_count() != 0;
x.as_any().downcast_ref().unwrap()
})
.collect();
let nulls = match has_nulls {
true => {
let mut builder =
BooleanBufferBuilder::new(indices.len());
for (a, b) in indices {
let v = arrays[*a].is_valid(*b);
builder.append(v)
}
Some(NullBuffer::new(builder.finish()))
}
false => None,
};
Self { arrays, nulls }
}
}
fn interleave_bytes<T: ByteArrayType>(
values: &[&dyn Array],
indices: &[(usize, usize)],
) -> Result<ArrayRef, ArrowError> {
let interleaved = Interleave::<'_, GenericByteArray<T>>::new(values,
indices);
let mut capacity = 0;
let mut offsets = BufferBuilder::<T::Offset>::new(indices.len() + 1);
offsets.append(T::Offset::from_usize(0).unwrap());
for (a, b) in indices {
let o = interleaved.arrays[*a].value_offsets();
let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
capacity += element_len;
offsets.append(T::Offset::from_usize(capacity).expect("overflow"));
}
let mut values = MutableBuffer::new(capacity);
for (i, (a, b)) in indices.iter().enumerate() {
////////////////////////////////////////////////////////////
// prefetch next values
////////////////////////////////////////////////////////////
const PREFETCH_AHEAD: usize = 4;
if i + PREFETCH_AHEAD < indices.len() {
let (pa, pb) = indices[i + PREFETCH_AHEAD];
unsafe {
let array = interleaved.arrays.get_unchecked(pa);
let start = *array.value_offsets().get_unchecked(pb);
let ptr =
array.values().as_ptr().wrapping_add(start.as_usize());
prefetch_read_data(ptr, 3);
}
}
values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
}
// Safety: safe by construction
let array = unsafe {
let offsets =
OffsetBuffer::new_unchecked(offsets.finish().into());
GenericByteArray::<T>::new_unchecked(offsets, values.into(),
interleaved.nulls)
};
Ok(Arc::new(array))
}
let values = values.iter().map(|v| v.as_ref()).collect::<Vec<_>>();
match values.get(0).map(|v| v.data_type()) {
Some(DataType::Utf8) =>
interleave_bytes::<GenericStringType<i32>>(&values, indices),
Some(DataType::Binary) =>
interleave_bytes::<GenericBinaryType<i32>>(&values, indices),
_ => arrow::compute::interleave(&values, indices),
}
}
```
**Describe the solution you'd like**
1. i would like to introduce memory prefetching tech in arrow-rs. since not
all scenarios get benefit from prefetching, i suggest adding another kernel
function like `interleave_with_memory_prefetching` so we don't break current
implementation.
2. `prefetch_read_data` is still unstable, i'm not sure how we can use it in
stable rust.
**Describe alternatives you've considered**
<!--
A clear and concise description of any alternative solutions or features
you've considered.
-->
**Additional context**
<!--
Add any other context or screenshots about the feature request here.
-->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]