richox opened a new issue, #6813:
URL: https://github.com/apache/arrow-rs/issues/6813

   **Is your feature request related to a problem or challenge? Please describe 
what you are trying to do.**
   memory prefetching is widely used in randomly accessing array items, which 
is very suitable in some cases of take/interleave kernels.
   
   i have done some fast benchmark, it shows for completely random input, 
interleave with prefetching gains 2x performance with the current interleave 
implement:
   
   ```
   $ cargo run --release
   disable prefetch time: 1.305 sec
    enable prefetch time: 0.695 sec
   disable prefetch time: 1.331 sec
    enable prefetch time: 0.690 sec
   disable prefetch time: 1.318 sec
    enable prefetch time: 0.724 sec
   ```
   
   benchmark code:
   ```rust
   #![feature(core_intrinsics)]
   
   use std::error::Error;
   use std::intrinsics::prefetch_read_data;
   use std::sync::Arc;
   use std::time::Instant;
   
   use arrow::array::*;
   use arrow::buffer::*;
   use arrow::datatypes::*;
   use arrow::error::ArrowError;
   
   fn main() -> Result<(), Box<dyn Error>> {
       let generate_random_int_string = || format!("SomeTestStr={}", 
rand::random::<u64>());
       let mut arrays: Vec<ArrayRef> = vec![];
       for _ in 0..1000 {
           let str_array = (0..1000)
               .map(|_| generate_random_int_string())
               .collect::<Vec<_>>();
           arrays.push(Arc::new(StringArray::from_iter_values(str_array)));
       }
   
       let mut random_indices = vec![];
       for _ in 0..10000000 {
           random_indices.push((
               rand::random::<usize>() % arrays.len(),
               rand::random::<usize>() % arrays[0].len(),
           ));
       }
       let random_indices_len = random_indices.len();
       for i in 0..random_indices_len {
           random_indices.swap(i, rand::random::<usize>() % random_indices_len);
       }
   
       fn timer<T>(name: &str, f: impl FnOnce() -> T) -> T {
           let start_time = Instant::now();
           let ret = f();
           println!("{name} time: {:.3} sec", 
start_time.elapsed().as_secs_f64());
           ret
       }
   
       // warm up
       assert_eq!(
           &interleave_without_prefetch(&arrays, &random_indices)?,
           &interleave_with_prefetch(&arrays, &random_indices)?,
       );
   
       // benchmark
       for _ in 0..3 {
           let batch1 = timer("disable prefetch", || 
interleave_without_prefetch(&arrays, &random_indices))?;
           let batch2 = timer(" enable prefetch", || 
interleave_with_prefetch(&arrays, &random_indices))?;
           assert_eq!(&batch1, &batch2);
       }
       Ok(())
   }
   
   fn interleave_without_prefetch(
       values: &[ArrayRef],
       indices: &[(usize, usize)],
   ) -> Result<ArrayRef, ArrowError> {
       arrow::compute::interleave(&values.iter().map(|v| 
v.as_ref()).collect::<Vec<_>>(), indices)
   }
   
   fn interleave_with_prefetch(
       values: &[ArrayRef],
       indices: &[(usize, usize)],
   ) -> Result<ArrayRef, ArrowError> {
   
       struct Interleave<'a, T> {
           arrays: Vec<&'a T>,
           nulls: Option<NullBuffer>,
       }
   
       impl<'a, T: Array + 'static> Interleave<'a, T> {
           fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> 
Self {
               let mut has_nulls = false;
               let arrays: Vec<&T> = values
                   .iter()
                   .map(|x| {
                       has_nulls = has_nulls || x.null_count() != 0;
                       x.as_any().downcast_ref().unwrap()
                   })
               .collect();
   
               let nulls = match has_nulls {
                   true => {
                       let mut builder = 
BooleanBufferBuilder::new(indices.len());
                       for (a, b) in indices {
                           let v = arrays[*a].is_valid(*b);
                           builder.append(v)
                       }
                       Some(NullBuffer::new(builder.finish()))
                   }
                   false => None,
               };
   
               Self { arrays, nulls }
           }
       }
   
       fn interleave_bytes<T: ByteArrayType>(
           values: &[&dyn Array],
           indices: &[(usize, usize)],
           ) -> Result<ArrayRef, ArrowError> {
           let interleaved = Interleave::<'_, GenericByteArray<T>>::new(values, 
indices);
   
           let mut capacity = 0;
           let mut offsets = BufferBuilder::<T::Offset>::new(indices.len() + 1);
           offsets.append(T::Offset::from_usize(0).unwrap());
           for (a, b) in indices {
               let o = interleaved.arrays[*a].value_offsets();
               let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
               capacity += element_len;
               
offsets.append(T::Offset::from_usize(capacity).expect("overflow"));
           }
   
           let mut values = MutableBuffer::new(capacity);
           for (i, (a, b)) in indices.iter().enumerate() {
               ////////////////////////////////////////////////////////////
               // prefetch next values
               ////////////////////////////////////////////////////////////
               const PREFETCH_AHEAD: usize = 4;
               if i + PREFETCH_AHEAD < indices.len() {
                   let (pa, pb) = indices[i + PREFETCH_AHEAD];
                   unsafe {
                       let array = interleaved.arrays.get_unchecked(pa);
                       let start = *array.value_offsets().get_unchecked(pb);
                       let ptr = 
array.values().as_ptr().wrapping_add(start.as_usize());
                       prefetch_read_data(ptr, 3);
                   }
               }
               
values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
           }
   
           // Safety: safe by construction
           let array = unsafe {
               let offsets = 
OffsetBuffer::new_unchecked(offsets.finish().into());
               GenericByteArray::<T>::new_unchecked(offsets, values.into(), 
interleaved.nulls)
           };
           Ok(Arc::new(array))
       }
   
       let values = values.iter().map(|v| v.as_ref()).collect::<Vec<_>>();
       match values.get(0).map(|v| v.data_type()) {
           Some(DataType::Utf8) => 
interleave_bytes::<GenericStringType<i32>>(&values, indices),
           Some(DataType::Binary) => 
interleave_bytes::<GenericBinaryType<i32>>(&values, indices),
           _ => arrow::compute::interleave(&values, indices),
       }
   }
   ```
   
   **Describe the solution you'd like**
   1. i would like to introduce memory prefetching tech in arrow-rs. since not 
all scenarios get benefit from prefetching, i suggest adding another kernel 
function like `interleave_with_memory_prefetching` so we don't break current 
implementation.
   2. `prefetch_read_data` is still unstable, i'm not sure how we can use it in 
stable rust.
   
   **Describe alternatives you've considered**
   <!--
   A clear and concise description of any alternative solutions or features 
you've considered.
   -->
   
   **Additional context**
   <!--
   Add any other context or screenshots about the feature request here.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to