alamb commented on a change in pull request #8960:
URL: https://github.com/apache/arrow/pull/8960#discussion_r547260137
##########
File path: rust/arrow/src/array/transform/mod.rs
##########
@@ -46,16 +46,21 @@ struct _MutableArrayData<'a> {
pub len: usize,
pub null_buffer: MutableBuffer,
- pub buffers: Vec<MutableBuffer>,
+ pub buffer1: MutableBuffer,
Review comment:
I recommend comments here explaining the use of `buffer1` and `buffer2`
for future readers who may not have the context of this PR
##########
File path: rust/arrow/src/array/transform/utils.rs
##########
@@ -61,3 +61,17 @@ pub(super) fn extend_offsets<T: OffsetSizeTrait>(
buffer.extend_from_slice(last_offset.to_byte_slice());
});
}
+
+#[inline]
+pub(super) unsafe fn get_last_offset<T: OffsetSizeTrait>(
+ offset_buffer: &MutableBuffer,
+) -> T {
+ // JUSTIFICATION
+ // Benefit
+ // 20% performance improvement extend of variable sized arrays (see
bench `mutable_array`)
+ // Soundness
+ // * offset buffer is always extended in slices of T and aligned
accordingly.
+ // * Buffer[0] is initialized with one element, 0, and thus
`mutable_offsets.len() - 1` is always valid.
+ let offsets = offset_buffer.data().align_to::<T>().1;
Review comment:
I wonder if this would be a good place to use a debug_assert.
Something like the following to ensure the data was actually aligned as
intended
```suggestion
let (prefix, offsets, suffix) = offset_buffer.data().align_to::<T>();
debug_assert!(prefix.len() == 0 && suffix.len() == 0);
```
##########
File path: rust/arrow/src/array/transform/utils.rs
##########
@@ -61,3 +61,17 @@ pub(super) fn extend_offsets<T: OffsetSizeTrait>(
buffer.extend_from_slice(last_offset.to_byte_slice());
});
}
+
+#[inline]
+pub(super) unsafe fn get_last_offset<T: OffsetSizeTrait>(
+ offset_buffer: &MutableBuffer,
+) -> T {
+ // JUSTIFICATION
+ // Benefit
+ // 20% performance improvement extend of variable sized arrays (see
bench `mutable_array`)
+ // Soundness
+ // * offset buffer is always extended in slices of T and aligned
accordingly.
+ // * Buffer[0] is initialized with one element, 0, and thus
`mutable_offsets.len() - 1` is always valid.
+ let offsets = offset_buffer.data().align_to::<T>().1;
Review comment:
Or maybe something more direct:
```
debug_assert!(*offsets.get_unchecked(offsets.len() - 1) ==
mutable_offsets[mutable_offsets.len() - 1]);
```
##########
File path: rust/arrow/src/compute/kernels/filter.rs
##########
@@ -17,841 +17,241 @@
//! Defines miscellaneous array kernels.
-use crate::array::*;
-use crate::datatypes::*;
-use crate::error::{ArrowError, Result};
+use crate::error::Result;
use crate::record_batch::RecordBatch;
-use crate::{
- bitmap::Bitmap,
- buffer::{Buffer, MutableBuffer},
- util::bit_util,
-};
-use std::{mem, sync::Arc};
-
-/// trait for copying filtered null bitmap bits
-trait CopyNullBit {
- fn copy_null_bit(&mut self, source_index: usize);
- fn copy_null_bits(&mut self, source_index: usize, count: usize);
- fn null_count(&self) -> usize;
- fn null_buffer(&mut self) -> Buffer;
-}
-
-/// no-op null bitmap copy implementation,
-/// used when the filtered data array doesn't have a null bitmap
-struct NullBitNoop {}
-
-impl NullBitNoop {
- fn new() -> Self {
- NullBitNoop {}
- }
-}
-
-impl CopyNullBit for NullBitNoop {
- #[inline]
- fn copy_null_bit(&mut self, _source_index: usize) {
- // do nothing
- }
-
- #[inline]
- fn copy_null_bits(&mut self, _source_index: usize, _count: usize) {
- // do nothing
- }
-
- fn null_count(&self) -> usize {
- 0
- }
-
- fn null_buffer(&mut self) -> Buffer {
- Buffer::from([0u8; 0])
- }
+use crate::{array::*, util::bit_chunk_iterator::BitChunkIterator};
+use std::{iter::Enumerate, sync::Arc};
+
+/// Function that can filter arbitrary arrays
+pub type Filter<'a> = Box<Fn(&ArrayData) -> ArrayData + 'a>;
+
+/// Internal state of [SlicesIterator]
+#[derive(Debug, PartialEq)]
+enum State {
+ // it is iterating over bits of a mask (`u64`, steps of size of 1 slot)
+ Bits(u64),
+ // it is iterating over chunks (steps of size of 64 slots)
+ Chunks,
+ // it is iterating over the remainding bits (steps of size of 1 slot)
+ Remainder,
+ // nothing more to iterate.
+ Finish,
}
-/// null bitmap copy implementation,
-/// used when the filtered data array has a null bitmap
-struct NullBitSetter<'a> {
- target_buffer: MutableBuffer,
- source_bytes: &'a [u8],
- target_index: usize,
- null_count: usize,
+/// An iterator of `(usize, usize)` each representing an interval
`[start,end[` whose
+/// slots of a [BooleanArray] are true. Each interval corresponds to a
contiguous region of memory to be
+/// "taken" from an array to be filtered.
+#[derive(Debug)]
+struct SlicesIterator<'a> {
+ iter: Enumerate<BitChunkIterator<'a>>,
+ state: State,
+ filter_count: usize,
+ remainder_mask: u64,
+ remainder_len: usize,
+ chunk_len: usize,
+ len: usize,
+ start: usize,
+ on_region: bool,
+ current_chunk: usize,
+ current_bit: usize,
}
-impl<'a> NullBitSetter<'a> {
- fn new(null_bitmap: &'a Bitmap) -> Self {
- let null_bytes = null_bitmap.buffer_ref().data();
- // create null bitmap buffer with same length and initialize null
bitmap buffer to 1s
- let null_buffer =
- MutableBuffer::new(null_bytes.len()).with_bitset(null_bytes.len(),
true);
- NullBitSetter {
- source_bytes: null_bytes,
- target_buffer: null_buffer,
- target_index: 0,
- null_count: 0,
+impl<'a> SlicesIterator<'a> {
+ fn new(filter: &'a BooleanArray) -> Self {
+ let values = &filter.data_ref().buffers()[0];
+
+ // this operation is performed before iteration
+ // because it is fast and allows reserving all the needed memory
+ let filter_count = values.count_set_bits_offset(filter.offset(),
filter.len());
+
+ let chunks = values.bit_chunks(filter.offset(), filter.len());
+
+ Self {
+ iter: chunks.iter().enumerate(),
+ state: State::Chunks,
+ filter_count,
+ remainder_len: chunks.remainder_len(),
+ chunk_len: chunks.chunk_len(),
+ remainder_mask: chunks.remainder_bits(),
+ len: 0,
+ start: 0,
+ on_region: false,
+ current_chunk: 0,
+ current_bit: 0,
}
}
-}
-impl<'a> CopyNullBit for NullBitSetter<'a> {
#[inline]
- fn copy_null_bit(&mut self, source_index: usize) {
- if !bit_util::get_bit(self.source_bytes, source_index) {
- bit_util::unset_bit(self.target_buffer.data_mut(),
self.target_index);
- self.null_count += 1;
- }
- self.target_index += 1;
+ fn current_start(&self) -> usize {
+ self.current_chunk * 64 + self.current_bit
}
#[inline]
- fn copy_null_bits(&mut self, source_index: usize, count: usize) {
- for i in 0..count {
- self.copy_null_bit(source_index + i);
+ fn iterate_bits(&mut self, mask: u64, max: usize) -> Option<(usize,
usize)> {
+ while self.current_bit < max {
+ if (mask & (1 << self.current_bit)) != 0 {
+ if !self.on_region {
+ self.start = self.current_start();
+ self.on_region = true;
+ }
+ self.len += 1;
+ } else if self.on_region {
+ let result = (self.start, self.start + self.len);
+ self.len = 0;
+ self.on_region = false;
+ self.current_bit += 1;
+ return Some(result);
+ }
+ self.current_bit += 1;
}
+ self.current_bit = 0;
+ None
}
- fn null_count(&self) -> usize {
- self.null_count
- }
-
- fn null_buffer(&mut self) -> Buffer {
- self.target_buffer.resize(self.target_index);
- // use mem::replace to detach self.target_buffer from self so that it
can be returned
- let target_buffer = mem::replace(&mut self.target_buffer,
MutableBuffer::new(0));
- target_buffer.freeze()
- }
-}
-
-fn get_null_bit_setter<'a>(data_array: &'a impl Array) -> Box<CopyNullBit +
'a> {
- if let Some(null_bitmap) = data_array.data_ref().null_bitmap() {
- // only return an actual null bit copy implementation if null_bitmap
is set
- Box::new(NullBitSetter::new(null_bitmap))
- } else {
- // otherwise return a no-op copy null bit implementation
- // for improved performance when the filtered array doesn't contain
NULLs
- Box::new(NullBitNoop::new())
- }
-}
-
-// transmute filter array to u64
-// - optimize filtering with highly selective filters by skipping entire
batches of 64 filter bits
-// - if the data array being filtered doesn't have a null bitmap, no time is
wasted to copy a null bitmap
-fn filter_array_impl(
- filter_context: &FilterContext,
- data_array: &impl Array,
- array_type: DataType,
- value_size: usize,
-) -> Result<ArrayDataBuilder> {
- if filter_context.filter_len > data_array.len() {
- return Err(ArrowError::ComputeError(
- "Filter array cannot be larger than data array".to_string(),
- ));
- }
- let filtered_count = filter_context.filtered_count;
- let filter_mask = &filter_context.filter_mask;
- let filter_u64 = &filter_context.filter_u64;
- let data_bytes = data_array.data_ref().buffers()[0].data();
- let mut target_buffer = MutableBuffer::new(filtered_count * value_size);
- target_buffer.resize(filtered_count * value_size);
- let target_bytes = target_buffer.data_mut();
- let mut target_byte_index: usize = 0;
- let mut null_bit_setter = get_null_bit_setter(data_array);
- let null_bit_setter = null_bit_setter.as_mut();
- let all_ones_batch = !0u64;
- let data_array_offset = data_array.offset();
-
- for (i, filter_batch) in filter_u64.iter().enumerate() {
- // foreach u64 batch
- let filter_batch = *filter_batch;
- if filter_batch == 0 {
- // if batch == 0, all items are filtered out, so skip entire batch
- continue;
- } else if filter_batch == all_ones_batch {
- // if batch == all 1s: copy all 64 values in one go
- let data_index = (i * 64) + data_array_offset;
- null_bit_setter.copy_null_bits(data_index, 64);
- let data_byte_index = data_index * value_size;
- let data_len = value_size * 64;
- target_bytes[target_byte_index..(target_byte_index + data_len)]
- .copy_from_slice(
- &data_bytes[data_byte_index..(data_byte_index + data_len)],
- );
- target_byte_index += data_len;
- continue;
- }
- for (j, filter_mask) in filter_mask.iter().enumerate() {
- // foreach bit in batch:
- if (filter_batch & *filter_mask) != 0 {
- let data_index = (i * 64) + j + data_array_offset;
- null_bit_setter.copy_null_bit(data_index);
- // if filter bit == 1: copy data value bytes
- let data_byte_index = data_index * value_size;
- target_bytes[target_byte_index..(target_byte_index +
value_size)]
- .copy_from_slice(
- &data_bytes[data_byte_index..(data_byte_index +
value_size)],
- );
- target_byte_index += value_size;
+ /// iterates over chunks.
+ #[inline]
+ fn iterate_chunks(&mut self) -> Option<(usize, usize)> {
+ while let Some((i, mask)) = self.iter.next() {
+ self.current_chunk = i;
+ if mask == 0 {
+ if self.on_region {
+ let result = (self.start, self.start + self.len);
+ self.len = 0;
+ self.on_region = false;
+ return Some(result);
+ }
+ } else if mask == 18446744073709551615u64 {
+ // = !0u64
+ if !self.on_region {
+ self.start = self.current_start();
+ self.on_region = true;
+ }
+ self.len += 64;
+ } else {
+ // there is a chunk that has a non-trivial mask => iterate
over bits.
+ self.state = State::Bits(mask);
+ return None;
}
}
+ // no more chunks => start iterating over the remainder
+ self.current_chunk = self.chunk_len;
+ self.state = State::Remainder;
+ None
}
-
- let mut array_data_builder = ArrayDataBuilder::new(array_type)
- .len(filtered_count)
- .add_buffer(target_buffer.freeze());
- if null_bit_setter.null_count() > 0 {
- array_data_builder = array_data_builder
- .null_count(null_bit_setter.null_count())
- .null_bit_buffer(null_bit_setter.null_buffer());
- }
-
- Ok(array_data_builder)
-}
-
-/// FilterContext can be used to improve performance when
-/// filtering multiple data arrays with the same filter array.
-#[derive(Debug)]
-pub struct FilterContext {
- filter_u64: Vec<u64>,
- filter_len: usize,
- filtered_count: usize,
- filter_mask: Vec<u64>,
-}
-
-macro_rules! filter_primitive_array {
- ($context:expr, $array:expr, $array_type:ident) => {{
- let input_array =
$array.as_any().downcast_ref::<$array_type>().unwrap();
- let output_array = $context.filter_primitive_array(input_array)?;
- Ok(Arc::new(output_array))
- }};
-}
-
-macro_rules! filter_dictionary_array {
- ($context:expr, $array:expr, $array_type:ident) => {{
- let input_array =
$array.as_any().downcast_ref::<$array_type>().unwrap();
- let output_array = $context.filter_dictionary_array(input_array)?;
- Ok(Arc::new(output_array))
- }};
}
-macro_rules! filter_boolean_item_list_array {
- ($context:expr, $array:expr, $list_type:ident, $list_builder_type:ident)
=> {{
- let input_array =
$array.as_any().downcast_ref::<$list_type>().unwrap();
- let values_builder = BooleanBuilder::new($context.filtered_count);
- let mut builder = $list_builder_type::new(values_builder);
- for i in 0..$context.filter_u64.len() {
- // foreach u64 batch
- let filter_batch = $context.filter_u64[i];
- if filter_batch == 0 {
- // if batch == 0, all items are filtered out, so skip entire
batch
- continue;
- }
- for j in 0..64 {
- // foreach bit in batch:
- if (filter_batch & $context.filter_mask[j]) != 0 {
- let data_index = (i * 64) + j;
- if input_array.is_null(data_index) {
- builder.append(false)?;
- } else {
- let this_inner_list = input_array.value(data_index);
- let inner_list = this_inner_list
- .as_any()
- .downcast_ref::<BooleanArray>()
- .unwrap();
- for k in 0..inner_list.len() {
- if inner_list.is_null(k) {
- builder.values().append_null()?;
- } else {
-
builder.values().append_value(inner_list.value(k))?;
- }
- }
- builder.append(true)?;
+impl<'a> Iterator for SlicesIterator<'a> {
+ type Item = (usize, usize);
+
+ fn next(&mut self) -> Option<Self::Item> {
+ match self.state {
+ State::Chunks => {
+ match self.iterate_chunks() {
+ None => {
+ // iterating over chunks does not yield any new slice
=> continue to the next
+ self.current_bit = 0;
+ self.next()
}
+ other => other,
}
}
- }
- Ok(Arc::new(builder.finish()))
- }};
-}
-
-macro_rules! filter_primitive_item_list_array {
- ($context:expr, $array:expr, $item_type:ident, $list_type:ident,
$list_builder_type:ident) => {{
- let input_array =
$array.as_any().downcast_ref::<$list_type>().unwrap();
- let values_builder =
PrimitiveBuilder::<$item_type>::new($context.filtered_count);
- let mut builder = $list_builder_type::new(values_builder);
- for i in 0..$context.filter_u64.len() {
- // foreach u64 batch
- let filter_batch = $context.filter_u64[i];
- if filter_batch == 0 {
- // if batch == 0, all items are filtered out, so skip entire
batch
- continue;
- }
- for j in 0..64 {
- // foreach bit in batch:
- if (filter_batch & $context.filter_mask[j]) != 0 {
- let data_index = (i * 64) + j;
- if input_array.is_null(data_index) {
- builder.append(false)?;
- } else {
- let this_inner_list = input_array.value(data_index);
- let inner_list = this_inner_list
- .as_any()
- .downcast_ref::<PrimitiveArray<$item_type>>()
- .unwrap();
- for k in 0..inner_list.len() {
- if inner_list.is_null(k) {
- builder.values().append_null()?;
- } else {
-
builder.values().append_value(inner_list.value(k))?;
- }
- }
- builder.append(true)?;
+ State::Bits(mask) => {
+ match self.iterate_bits(mask, 64) {
+ None => {
+ // iterating over bits does not yield any new slice =>
change back
+ // to chunks and continue to the next
+ self.state = State::Chunks;
+ self.next()
}
+ other => other,
}
}
- }
- Ok(Arc::new(builder.finish()))
- }};
-}
-
-macro_rules! filter_non_primitive_item_list_array {
- ($context:expr, $array:expr, $item_array_type:ident, $item_builder:ident,
$list_type:ident, $list_builder_type:ident) => {{
- let input_array =
$array.as_any().downcast_ref::<$list_type>().unwrap();
- let values_builder = $item_builder::new($context.filtered_count);
- let mut builder = $list_builder_type::new(values_builder);
- for i in 0..$context.filter_u64.len() {
- // foreach u64 batch
- let filter_batch = $context.filter_u64[i];
- if filter_batch == 0 {
- // if batch == 0, all items are filtered out, so skip entire
batch
- continue;
- }
- for j in 0..64 {
- // foreach bit in batch:
- if (filter_batch & $context.filter_mask[j]) != 0 {
- let data_index = (i * 64) + j;
- if input_array.is_null(data_index) {
- builder.append(false)?;
- } else {
- let this_inner_list = input_array.value(data_index);
- let inner_list = this_inner_list
- .as_any()
- .downcast_ref::<$item_array_type>()
- .unwrap();
- for k in 0..inner_list.len() {
- if inner_list.is_null(k) {
- builder.values().append_null()?;
- } else {
-
builder.values().append_value(inner_list.value(k))?;
- }
+ State::Remainder => {
+ match self.iterate_bits(self.remainder_mask,
self.remainder_len) {
+ None => {
+ self.state = State::Finish;
+ if self.on_region {
+ Some((self.start, self.start + self.len))
+ } else {
+ None
}
- builder.append(true)?;
}
+ other => other,
}
}
+ State::Finish => None,
}
- Ok(Arc::new(builder.finish()))
- }};
-}
-
-impl FilterContext {
- /// Returns a new instance of FilterContext
- pub fn new(filter_array: &BooleanArray) -> Result<Self> {
- if filter_array.offset() > 0 {
- return Err(ArrowError::ComputeError(
- "Filter array cannot have offset > 0".to_string(),
- ));
- }
- let filter_mask: Vec<u64> = (0..64).map(|x| 1u64 << x).collect();
- let filter_buffer = &filter_array.data_ref().buffers()[0];
- let filtered_count = filter_buffer.count_set_bits_offset(0,
filter_array.len());
-
- let filter_bytes = filter_buffer.data();
-
- // add to the resulting len so is is a multiple of the size of u64
- let pad_addional_len = 8 - filter_bytes.len() % 8;
-
- // transmute filter_bytes to &[u64]
- let mut u64_buffer = MutableBuffer::new(filter_bytes.len() +
pad_addional_len);
-
- u64_buffer.extend_from_slice(filter_bytes);
- u64_buffer.extend_from_slice(&vec![0; pad_addional_len]);
- let mut filter_u64 = u64_buffer.typed_data_mut::<u64>().to_owned();
-
- // mask of any bits outside of the given len
- if filter_array.len() % 64 != 0 {
- let last_idx = filter_u64.len() - 1;
- let mask = u64::MAX >> (64 - filter_array.len() % 64);
- filter_u64[last_idx] &= mask;
- }
-
- Ok(FilterContext {
- filter_u64,
- filter_len: filter_array.len(),
- filtered_count,
- filter_mask,
- })
- }
-
- /// Returns a new array, containing only the elements matching the filter
- pub fn filter(&self, array: &Array) -> Result<ArrayRef> {
- match array.data_type() {
- DataType::UInt8 => filter_primitive_array!(self, array,
UInt8Array),
- DataType::UInt16 => filter_primitive_array!(self, array,
UInt16Array),
- DataType::UInt32 => filter_primitive_array!(self, array,
UInt32Array),
- DataType::UInt64 => filter_primitive_array!(self, array,
UInt64Array),
- DataType::Int8 => filter_primitive_array!(self, array, Int8Array),
- DataType::Int16 => filter_primitive_array!(self, array,
Int16Array),
- DataType::Int32 => filter_primitive_array!(self, array,
Int32Array),
- DataType::Int64 => filter_primitive_array!(self, array,
Int64Array),
- DataType::Float32 => filter_primitive_array!(self, array,
Float32Array),
- DataType::Float64 => filter_primitive_array!(self, array,
Float64Array),
- DataType::Boolean => {
- let input_array =
array.as_any().downcast_ref::<BooleanArray>().unwrap();
- let mut builder = BooleanArray::builder(self.filtered_count);
- for i in 0..self.filter_u64.len() {
- // foreach u64 batch
- let filter_batch = self.filter_u64[i];
- if filter_batch == 0 {
- // if batch == 0, all items are filtered out, so skip
entire batch
- continue;
- }
- for j in 0..64 {
- // foreach bit in batch:
- if (filter_batch & self.filter_mask[j]) != 0 {
- let data_index = (i * 64) + j;
- if input_array.is_null(data_index) {
- builder.append_null()?;
- } else {
-
builder.append_value(input_array.value(data_index))?;
- }
- }
- }
- }
- Ok(Arc::new(builder.finish()))
- },
- DataType::Date32(_) => filter_primitive_array!(self, array,
Date32Array),
- DataType::Date64(_) => filter_primitive_array!(self, array,
Date64Array),
- DataType::Time32(TimeUnit::Second) => {
- filter_primitive_array!(self, array, Time32SecondArray)
- }
- DataType::Time32(TimeUnit::Millisecond) => {
- filter_primitive_array!(self, array, Time32MillisecondArray)
- }
- DataType::Time64(TimeUnit::Microsecond) => {
- filter_primitive_array!(self, array, Time64MicrosecondArray)
- }
- DataType::Time64(TimeUnit::Nanosecond) => {
- filter_primitive_array!(self, array, Time64NanosecondArray)
- }
- DataType::Duration(TimeUnit::Second) => {
- filter_primitive_array!(self, array, DurationSecondArray)
- }
- DataType::Duration(TimeUnit::Millisecond) => {
- filter_primitive_array!(self, array, DurationMillisecondArray)
- }
- DataType::Duration(TimeUnit::Microsecond) => {
- filter_primitive_array!(self, array, DurationMicrosecondArray)
- }
- DataType::Duration(TimeUnit::Nanosecond) => {
- filter_primitive_array!(self, array, DurationNanosecondArray)
- }
- DataType::Timestamp(TimeUnit::Second, _) => {
- filter_primitive_array!(self, array, TimestampSecondArray)
- }
- DataType::Timestamp(TimeUnit::Millisecond, _) => {
- filter_primitive_array!(self, array, TimestampMillisecondArray)
- }
- DataType::Timestamp(TimeUnit::Microsecond, _) => {
- filter_primitive_array!(self, array, TimestampMicrosecondArray)
- }
- DataType::Timestamp(TimeUnit::Nanosecond, _) => {
- filter_primitive_array!(self, array, TimestampNanosecondArray)
- }
- DataType::Binary => {
- let input_array =
array.as_any().downcast_ref::<BinaryArray>().unwrap();
- let mut values: Vec<Option<&[u8]>> =
Vec::with_capacity(self.filtered_count);
- for i in 0..self.filter_u64.len() {
- // foreach u64 batch
- let filter_batch = self.filter_u64[i];
- if filter_batch == 0 {
- // if batch == 0, all items are filtered out, so skip
entire batch
- continue;
- }
- for j in 0..64 {
- // foreach bit in batch:
- if (filter_batch & self.filter_mask[j]) != 0 {
- let data_index = (i * 64) + j;
- if input_array.is_null(data_index) {
- values.push(None)
- } else {
-
values.push(Some(input_array.value(data_index)))
- }
- }
- }
- }
- Ok(Arc::new(BinaryArray::from(values)))
- }
- DataType::Utf8 => {
- let input_array =
array.as_any().downcast_ref::<StringArray>().unwrap();
- let mut values: Vec<Option<&str>> =
Vec::with_capacity(self.filtered_count);
- for i in 0..self.filter_u64.len() {
- // foreach u64 batch
- let filter_batch = self.filter_u64[i];
- if filter_batch == 0 {
- // if batch == 0, all items are filtered out, so skip
entire batch
- continue;
- }
- for j in 0..64 {
- // foreach bit in batch:
- if (filter_batch & self.filter_mask[j]) != 0 {
- let data_index = (i * 64) + j;
- if input_array.is_null(data_index) {
- values.push(None)
- } else {
-
values.push(Some(input_array.value(data_index)))
- }
- }
- }
- }
- Ok(Arc::new(StringArray::from(values)))
- }
- DataType::Dictionary(ref key_type, ref value_type) => match
(key_type.as_ref(), value_type.as_ref()) {
- (key_type, DataType::Utf8) => match key_type {
- DataType::UInt8 => filter_dictionary_array!(self, array,
UInt8DictionaryArray),
- DataType::UInt16 => filter_dictionary_array!(self, array,
UInt16DictionaryArray),
- DataType::UInt32 => filter_dictionary_array!(self, array,
UInt32DictionaryArray),
- DataType::UInt64 => filter_dictionary_array!(self, array,
UInt64DictionaryArray),
- DataType::Int8 => filter_dictionary_array!(self, array,
Int8DictionaryArray),
- DataType::Int16 => filter_dictionary_array!(self, array,
Int16DictionaryArray),
- DataType::Int32 => filter_dictionary_array!(self, array,
Int32DictionaryArray),
- DataType::Int64 => filter_dictionary_array!(self, array,
Int64DictionaryArray),
- other => Err(ArrowError::ComputeError(format!(
- "filter not supported for string dictionary with key
of type {:?}",
- other
- )))
- }
- (key_type, value_type) => Err(ArrowError::ComputeError(format!(
- "filter not supported for Dictionary({:?}, {:?})",
- key_type, value_type
- )))
- }
- DataType::List(dt) => match dt.data_type() {
- DataType::UInt8 => {
- filter_primitive_item_list_array!(self, array, UInt8Type,
ListArray, ListBuilder)
- }
- DataType::UInt16 => {
- filter_primitive_item_list_array!(self, array, UInt16Type,
ListArray, ListBuilder)
- }
- DataType::UInt32 => {
- filter_primitive_item_list_array!(self, array, UInt32Type,
ListArray, ListBuilder)
- }
- DataType::UInt64 => {
- filter_primitive_item_list_array!(self, array, UInt64Type,
ListArray, ListBuilder)
- }
- DataType::Int8 => filter_primitive_item_list_array!(self,
array, Int8Type, ListArray, ListBuilder),
- DataType::Int16 => {
- filter_primitive_item_list_array!(self, array, Int16Type,
ListArray, ListBuilder)
- }
- DataType::Int32 => {
- filter_primitive_item_list_array!(self, array, Int32Type,
ListArray, ListBuilder)
- }
- DataType::Int64 => {
- filter_primitive_item_list_array!(self, array, Int64Type,
ListArray, ListBuilder)
- }
- DataType::Float32 => {
- filter_primitive_item_list_array!(self, array,
Float32Type, ListArray, ListBuilder)
- }
- DataType::Float64 => {
- filter_primitive_item_list_array!(self, array,
Float64Type, ListArray, ListBuilder)
- }
- DataType::Boolean => {
- filter_boolean_item_list_array!(self, array, ListArray,
ListBuilder)
- }
- DataType::Date32(_) => {
- filter_primitive_item_list_array!(self, array, Date32Type,
ListArray, ListBuilder)
- }
- DataType::Date64(_) => {
- filter_primitive_item_list_array!(self, array, Date64Type,
ListArray, ListBuilder)
- }
- DataType::Time32(TimeUnit::Second) => {
- filter_primitive_item_list_array!(self, array,
Time32SecondType, ListArray, ListBuilder)
- }
- DataType::Time32(TimeUnit::Millisecond) => {
- filter_primitive_item_list_array!(self, array,
Time32MillisecondType, ListArray, ListBuilder)
- }
- DataType::Time64(TimeUnit::Microsecond) => {
- filter_primitive_item_list_array!(self, array,
Time64MicrosecondType, ListArray, ListBuilder)
- }
- DataType::Time64(TimeUnit::Nanosecond) => {
- filter_primitive_item_list_array!(self, array,
Time64NanosecondType, ListArray, ListBuilder)
- }
- DataType::Duration(TimeUnit::Second) => {
- filter_primitive_item_list_array!(self, array,
DurationSecondType, ListArray, ListBuilder)
- }
- DataType::Duration(TimeUnit::Millisecond) => {
- filter_primitive_item_list_array!(self, array,
DurationMillisecondType, ListArray, ListBuilder)
- }
- DataType::Duration(TimeUnit::Microsecond) => {
- filter_primitive_item_list_array!(self, array,
DurationMicrosecondType, ListArray, ListBuilder)
- }
- DataType::Duration(TimeUnit::Nanosecond) => {
- filter_primitive_item_list_array!(self, array,
DurationNanosecondType, ListArray, ListBuilder)
- }
- DataType::Timestamp(TimeUnit::Second, _) => {
- filter_primitive_item_list_array!(self, array,
TimestampSecondType, ListArray, ListBuilder)
- }
- DataType::Timestamp(TimeUnit::Millisecond, _) => {
- filter_primitive_item_list_array!(self, array,
TimestampMillisecondType, ListArray, ListBuilder)
- }
- DataType::Timestamp(TimeUnit::Microsecond, _) => {
- filter_primitive_item_list_array!(self, array,
TimestampMicrosecondType, ListArray, ListBuilder)
- }
- DataType::Timestamp(TimeUnit::Nanosecond, _) => {
- filter_primitive_item_list_array!(self, array,
TimestampNanosecondType, ListArray, ListBuilder)
- }
- DataType::Binary => filter_non_primitive_item_list_array!(
- self,
- array,
- BinaryArray,
- BinaryBuilder,
- ListArray,
- ListBuilder
- ),
- DataType::LargeBinary => filter_non_primitive_item_list_array!(
- self,
- array,
- LargeBinaryArray,
- LargeBinaryBuilder,
- ListArray,
- ListBuilder
- ),
- DataType::Utf8 => filter_non_primitive_item_list_array!(
- self,
- array,
- StringArray,
- StringBuilder,
- ListArray
- ,ListBuilder
- ),
- DataType::LargeUtf8 => filter_non_primitive_item_list_array!(
- self,
- array,
- LargeStringArray,
- LargeStringBuilder,
- ListArray,
- ListBuilder
- ),
- other => {
- Err(ArrowError::ComputeError(format!(
- "filter not supported for List({:?})",
- other
- )))
- }
- }
- DataType::LargeList(dt) => match dt.data_type() {
- DataType::UInt8 => {
- filter_primitive_item_list_array!(self, array, UInt8Type,
LargeListArray, LargeListBuilder)
- }
- DataType::UInt16 => {
- filter_primitive_item_list_array!(self, array, UInt16Type,
LargeListArray, LargeListBuilder)
- }
- DataType::UInt32 => {
- filter_primitive_item_list_array!(self, array, UInt32Type,
LargeListArray, LargeListBuilder)
- }
- DataType::UInt64 => {
- filter_primitive_item_list_array!(self, array, UInt64Type,
LargeListArray, LargeListBuilder)
- }
- DataType::Int8 => filter_primitive_item_list_array!(self,
array, Int8Type, LargeListArray, LargeListBuilder),
- DataType::Int16 => {
- filter_primitive_item_list_array!(self, array, Int16Type,
LargeListArray, LargeListBuilder)
- }
- DataType::Int32 => {
- filter_primitive_item_list_array!(self, array, Int32Type,
LargeListArray, LargeListBuilder)
- }
- DataType::Int64 => {
- filter_primitive_item_list_array!(self, array, Int64Type,
LargeListArray, LargeListBuilder)
- }
- DataType::Float32 => {
- filter_primitive_item_list_array!(self, array,
Float32Type, LargeListArray, LargeListBuilder)
- }
- DataType::Float64 => {
- filter_primitive_item_list_array!(self, array,
Float64Type, LargeListArray, LargeListBuilder)
- }
- DataType::Boolean => {
- filter_boolean_item_list_array!(self, array,
LargeListArray, LargeListBuilder)
- }
- DataType::Date32(_) => {
- filter_primitive_item_list_array!(self, array, Date32Type,
LargeListArray, LargeListBuilder)
- }
- DataType::Date64(_) => {
- filter_primitive_item_list_array!(self, array, Date64Type,
LargeListArray, LargeListBuilder)
- }
- DataType::Time32(TimeUnit::Second) => {
- filter_primitive_item_list_array!(self, array,
Time32SecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Time32(TimeUnit::Millisecond) => {
- filter_primitive_item_list_array!(self, array,
Time32MillisecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Time64(TimeUnit::Microsecond) => {
- filter_primitive_item_list_array!(self, array,
Time64MicrosecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Time64(TimeUnit::Nanosecond) => {
- filter_primitive_item_list_array!(self, array,
Time64NanosecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Duration(TimeUnit::Second) => {
- filter_primitive_item_list_array!(self, array,
DurationSecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Duration(TimeUnit::Millisecond) => {
- filter_primitive_item_list_array!(self, array,
DurationMillisecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Duration(TimeUnit::Microsecond) => {
- filter_primitive_item_list_array!(self, array,
DurationMicrosecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Duration(TimeUnit::Nanosecond) => {
- filter_primitive_item_list_array!(self, array,
DurationNanosecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Timestamp(TimeUnit::Second, _) => {
- filter_primitive_item_list_array!(self, array,
TimestampSecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Timestamp(TimeUnit::Millisecond, _) => {
- filter_primitive_item_list_array!(self, array,
TimestampMillisecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Timestamp(TimeUnit::Microsecond, _) => {
- filter_primitive_item_list_array!(self, array,
TimestampMicrosecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Timestamp(TimeUnit::Nanosecond, _) => {
- filter_primitive_item_list_array!(self, array,
TimestampNanosecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Binary => filter_non_primitive_item_list_array!(
- self,
- array,
- BinaryArray,
- BinaryBuilder,
- LargeListArray,
- LargeListBuilder
- ),
- DataType::LargeBinary => filter_non_primitive_item_list_array!(
- self,
- array,
- LargeBinaryArray,
- LargeBinaryBuilder,
- LargeListArray,
- LargeListBuilder
- ),
- DataType::Utf8 => filter_non_primitive_item_list_array!(
- self,
- array,
- StringArray,
- StringBuilder,
- LargeListArray,
- LargeListBuilder
- ),
- DataType::LargeUtf8 => filter_non_primitive_item_list_array!(
- self,
- array,
- LargeStringArray,
- LargeStringBuilder,
- LargeListArray,
- LargeListBuilder
- ),
- other => {
- Err(ArrowError::ComputeError(format!(
- "filter not supported for LargeList({:?})",
- other
- )))
- }
- }
- other => Err(ArrowError::ComputeError(format!(
- "filter not supported for {:?}",
- other
- ))),
- }
- }
-
- /// Returns a new PrimitiveArray<T> containing only those values from the
array passed as the data_array parameter,
- /// selected by the BooleanArray passed as the filter_array parameter
- pub fn filter_primitive_array<T>(
- &self,
- data_array: &PrimitiveArray<T>,
- ) -> Result<PrimitiveArray<T>>
- where
- T: ArrowNumericType,
- {
- let array_type = T::DATA_TYPE;
- let value_size = mem::size_of::<T::Native>();
- let array_data_builder =
- filter_array_impl(self, data_array, array_type, value_size)?;
- let data = array_data_builder.build();
- Ok(PrimitiveArray::<T>::from(data))
- }
-
- /// Returns a new DictionaryArray<T> containing only those keys from the
array passed as the data_array parameter,
- /// selected by the BooleanArray passed as the filter_array parameter. The
values are cloned from the data_array.
- pub fn filter_dictionary_array<T>(
- &self,
- data_array: &DictionaryArray<T>,
- ) -> Result<DictionaryArray<T>>
- where
- T: ArrowNumericType,
- {
- let array_type = data_array.data_type().clone();
- let value_size = mem::size_of::<T::Native>();
- let mut array_data_builder =
- filter_array_impl(self, data_array, array_type, value_size)?;
- // copy dictionary values from input array
- array_data_builder =
- array_data_builder.add_child_data(data_array.values().data());
- let data = array_data_builder.build();
- Ok(DictionaryArray::<T>::from(data))
}
}
-/// Returns a new array, containing only the elements matching the filter.
-pub fn filter(array: &Array, filter: &BooleanArray) -> Result<ArrayRef> {
- FilterContext::new(filter)?.filter(array)
+/// Returns a function used to filter arbitrary arrays.
+/// This is faster (2x for primitive types) than using [filter] on multiple
arrays, but slower
+/// than [filter] when filtering a single array.
Review comment:
```suggestion
/// Returns a prepared function which can be applied to filter any number of
arbitrary arrays.
///
/// You should use [filter] when filtering a single array and `build_filter`
when filtering multiple arrays.
///
/// Creating this function requires time, but the prepared function is
faster than [filter] when the
/// same filtering must be applied to multiple arrays (e.g. a multi-column
`RecordBatch`).
```
##########
File path: rust/arrow/src/array/transform/variable_size.rs
##########
@@ -43,33 +47,33 @@ pub(super) fn build_extend<T: OffsetSizeTrait>(array:
&ArrayData) -> Extend {
// fast case where we can copy regions without null issues
Box::new(
move |mutable: &mut _MutableArrayData, _, start: usize, len:
usize| {
- let mutable_offsets = mutable.buffer::<T>(0);
- let last_offset = mutable_offsets[mutable_offsets.len() - 1];
- // offsets
- let buffer = &mut mutable.buffers[0];
+ let offset_buffer = &mut mutable.buffer1;
+ let values_buffer = &mut mutable.buffer2;
+
+ // this is safe due to how offset is built. See details on
`get_last_offset`
+ let last_offset = unsafe { get_last_offset(offset_buffer) };
Review comment:
I find use of unsafe in order to call `offset_buffer` somewhat
confusing. I suspect you are trying to follow the unsafe guidelines and ensure
it is clear where unsafe is being used.
However, in this case the only thing the caller can do is trust that the
`MutableBuffer` it was passed in was created correctly. Forcing callers to say
`unsafe` in order for the call to `get_last_offset` even though they can do
nothing to ensure/validate things are safe or not seems unnecessarily confusing
to me
I would personally suggest making `get_last_offset` an associated function,
such as `MutableBuffer::get_last_offset`
And then change calls such as this to
```
let last_offset = offset_buffer.get_last_offset();
```
##########
File path: rust/arrow/src/compute/kernels/filter.rs
##########
@@ -1131,36 +520,64 @@ mod tests {
// a = [[0, 1, 2], [3, 4, 5], [6, 7], null]
let a = LargeListArray::from(list_data);
let b = BooleanArray::from(vec![false, true, false, true]);
- let c = filter(&a, &b).unwrap();
- let d = c
- .as_ref()
- .as_any()
- .downcast_ref::<LargeListArray>()
- .unwrap();
+ let result = filter(&a, &b).unwrap();
- assert_eq!(DataType::Int32, d.value_type());
+ // expected: [[3, 4, 5], null]
+ let value_data = ArrayData::builder(DataType::Int32)
+ .len(3)
+ .add_buffer(Buffer::from(&[3, 4, 5].to_byte_slice()))
+ .build();
- // result should be [[3, 4, 5], null]
- assert_eq!(2, d.len());
- assert_eq!(1, d.null_count());
- assert_eq!(true, d.is_null(1));
+ let value_offsets = Buffer::from(&[0i64, 3, 3].to_byte_slice());
+
+ let list_data_type =
+ DataType::LargeList(Box::new(Field::new("item", DataType::Int32,
false)));
+ let expected = ArrayData::builder(list_data_type)
+ .len(2)
+ .add_buffer(value_offsets)
+ .add_child_data(value_data)
+ .null_bit_buffer(Buffer::from([0b00000001]))
+ .build();
+
+ assert_eq!(&make_array(expected), &result);
+ }
+
+ #[test]
+ fn test_slice_iterator_bits() {
+ let filter_values = (0..64).map(|i| i == 1).collect::<Vec<bool>>();
+ let filter = BooleanArray::from(filter_values);
+
+ let iter = SlicesIterator::new(&filter);
+ let filter_count = iter.filter_count;
+ let chunks = iter.collect::<Vec<_>>();
+
+ assert_eq!(chunks, vec![(1, 2)]);
+ assert_eq!(filter_count, 1);
+ }
+
+ #[test]
+ fn test_slice_iterator_bits1() {
+ let filter_values = (0..64).map(|i| i != 1).collect::<Vec<bool>>();
+ let filter = BooleanArray::from(filter_values);
+
+ let iter = SlicesIterator::new(&filter);
+ let filter_count = iter.filter_count;
+ let chunks = iter.collect::<Vec<_>>();
+
+ assert_eq!(chunks, vec![(0, 1), (2, 64)]);
+ assert_eq!(filter_count, 64 - 1);
+ }
+
+ #[test]
+ fn test_slice_iterator_chunk_and_bits() {
+ let filter_values = (0..127).map(|i| i % 62 !=
0).collect::<Vec<bool>>();
Review comment:
Given the algorithm's use of 64-bit chunks, I recommend testing an array
that is not a multiple of 64, ideally something of length `192 + 17` or
something that would also test the transition `State::Chunks` --> `State::Bits`
--> `State::Chunks`
I may have missed this in reviewing the tests
##########
File path: rust/arrow/src/array/transform/mod.rs
##########
@@ -298,79 +290,137 @@ impl<'a> MutableArrayData<'a> {
use_nulls = true;
};
- let buffers = match &data_type {
+ let empty_buffer = MutableBuffer::new(0);
+ let buffers: [MutableBuffer; 2] = match &data_type {
Review comment:
Very minor suggestion:
```suggestion
let (buffer1, buffer2) = match &data_type {
```
And you can remove the destructuring below.
##########
File path: rust/arrow/src/compute/kernels/filter.rs
##########
@@ -17,841 +17,241 @@
//! Defines miscellaneous array kernels.
-use crate::array::*;
-use crate::datatypes::*;
-use crate::error::{ArrowError, Result};
+use crate::error::Result;
use crate::record_batch::RecordBatch;
-use crate::{
- bitmap::Bitmap,
- buffer::{Buffer, MutableBuffer},
- util::bit_util,
-};
-use std::{mem, sync::Arc};
-
-/// trait for copying filtered null bitmap bits
-trait CopyNullBit {
- fn copy_null_bit(&mut self, source_index: usize);
- fn copy_null_bits(&mut self, source_index: usize, count: usize);
- fn null_count(&self) -> usize;
- fn null_buffer(&mut self) -> Buffer;
-}
-
-/// no-op null bitmap copy implementation,
-/// used when the filtered data array doesn't have a null bitmap
-struct NullBitNoop {}
-
-impl NullBitNoop {
- fn new() -> Self {
- NullBitNoop {}
- }
-}
-
-impl CopyNullBit for NullBitNoop {
- #[inline]
- fn copy_null_bit(&mut self, _source_index: usize) {
- // do nothing
- }
-
- #[inline]
- fn copy_null_bits(&mut self, _source_index: usize, _count: usize) {
- // do nothing
- }
-
- fn null_count(&self) -> usize {
- 0
- }
-
- fn null_buffer(&mut self) -> Buffer {
- Buffer::from([0u8; 0])
- }
+use crate::{array::*, util::bit_chunk_iterator::BitChunkIterator};
+use std::{iter::Enumerate, sync::Arc};
+
+/// Function that can filter arbitrary arrays
+pub type Filter<'a> = Box<Fn(&ArrayData) -> ArrayData + 'a>;
+
+/// Internal state of [SlicesIterator]
+#[derive(Debug, PartialEq)]
+enum State {
+ // it is iterating over bits of a mask (`u64`, steps of size of 1 slot)
+ Bits(u64),
+ // it is iterating over chunks (steps of size of 64 slots)
+ Chunks,
+ // it is iterating over the remainding bits (steps of size of 1 slot)
+ Remainder,
+ // nothing more to iterate.
+ Finish,
}
-/// null bitmap copy implementation,
-/// used when the filtered data array has a null bitmap
-struct NullBitSetter<'a> {
- target_buffer: MutableBuffer,
- source_bytes: &'a [u8],
- target_index: usize,
- null_count: usize,
+/// An iterator of `(usize, usize)` each representing an interval
`[start,end[` whose
+/// slots of a [BooleanArray] are true. Each interval corresponds to a
contiguous region of memory to be
+/// "taken" from an array to be filtered.
+#[derive(Debug)]
+struct SlicesIterator<'a> {
+ iter: Enumerate<BitChunkIterator<'a>>,
+ state: State,
+ filter_count: usize,
+ remainder_mask: u64,
+ remainder_len: usize,
+ chunk_len: usize,
+ len: usize,
+ start: usize,
+ on_region: bool,
+ current_chunk: usize,
+ current_bit: usize,
}
-impl<'a> NullBitSetter<'a> {
- fn new(null_bitmap: &'a Bitmap) -> Self {
- let null_bytes = null_bitmap.buffer_ref().data();
- // create null bitmap buffer with same length and initialize null
bitmap buffer to 1s
- let null_buffer =
- MutableBuffer::new(null_bytes.len()).with_bitset(null_bytes.len(),
true);
- NullBitSetter {
- source_bytes: null_bytes,
- target_buffer: null_buffer,
- target_index: 0,
- null_count: 0,
+impl<'a> SlicesIterator<'a> {
+ fn new(filter: &'a BooleanArray) -> Self {
+ let values = &filter.data_ref().buffers()[0];
+
+ // this operation is performed before iteration
+ // because it is fast and allows reserving all the needed memory
+ let filter_count = values.count_set_bits_offset(filter.offset(),
filter.len());
+
+ let chunks = values.bit_chunks(filter.offset(), filter.len());
+
+ Self {
+ iter: chunks.iter().enumerate(),
+ state: State::Chunks,
+ filter_count,
+ remainder_len: chunks.remainder_len(),
+ chunk_len: chunks.chunk_len(),
+ remainder_mask: chunks.remainder_bits(),
+ len: 0,
+ start: 0,
+ on_region: false,
+ current_chunk: 0,
+ current_bit: 0,
}
}
-}
-impl<'a> CopyNullBit for NullBitSetter<'a> {
#[inline]
- fn copy_null_bit(&mut self, source_index: usize) {
- if !bit_util::get_bit(self.source_bytes, source_index) {
- bit_util::unset_bit(self.target_buffer.data_mut(),
self.target_index);
- self.null_count += 1;
- }
- self.target_index += 1;
+ fn current_start(&self) -> usize {
+ self.current_chunk * 64 + self.current_bit
}
#[inline]
- fn copy_null_bits(&mut self, source_index: usize, count: usize) {
- for i in 0..count {
- self.copy_null_bit(source_index + i);
+ fn iterate_bits(&mut self, mask: u64, max: usize) -> Option<(usize,
usize)> {
+ while self.current_bit < max {
+ if (mask & (1 << self.current_bit)) != 0 {
+ if !self.on_region {
+ self.start = self.current_start();
+ self.on_region = true;
+ }
+ self.len += 1;
+ } else if self.on_region {
+ let result = (self.start, self.start + self.len);
+ self.len = 0;
+ self.on_region = false;
+ self.current_bit += 1;
+ return Some(result);
+ }
+ self.current_bit += 1;
}
+ self.current_bit = 0;
+ None
}
- fn null_count(&self) -> usize {
- self.null_count
- }
-
- fn null_buffer(&mut self) -> Buffer {
- self.target_buffer.resize(self.target_index);
- // use mem::replace to detach self.target_buffer from self so that it
can be returned
- let target_buffer = mem::replace(&mut self.target_buffer,
MutableBuffer::new(0));
- target_buffer.freeze()
- }
-}
-
-fn get_null_bit_setter<'a>(data_array: &'a impl Array) -> Box<CopyNullBit +
'a> {
- if let Some(null_bitmap) = data_array.data_ref().null_bitmap() {
- // only return an actual null bit copy implementation if null_bitmap
is set
- Box::new(NullBitSetter::new(null_bitmap))
- } else {
- // otherwise return a no-op copy null bit implementation
- // for improved performance when the filtered array doesn't contain
NULLs
- Box::new(NullBitNoop::new())
- }
-}
-
-// transmute filter array to u64
-// - optimize filtering with highly selective filters by skipping entire
batches of 64 filter bits
-// - if the data array being filtered doesn't have a null bitmap, no time is
wasted to copy a null bitmap
-fn filter_array_impl(
- filter_context: &FilterContext,
- data_array: &impl Array,
- array_type: DataType,
- value_size: usize,
-) -> Result<ArrayDataBuilder> {
- if filter_context.filter_len > data_array.len() {
- return Err(ArrowError::ComputeError(
- "Filter array cannot be larger than data array".to_string(),
- ));
- }
- let filtered_count = filter_context.filtered_count;
- let filter_mask = &filter_context.filter_mask;
- let filter_u64 = &filter_context.filter_u64;
- let data_bytes = data_array.data_ref().buffers()[0].data();
- let mut target_buffer = MutableBuffer::new(filtered_count * value_size);
- target_buffer.resize(filtered_count * value_size);
- let target_bytes = target_buffer.data_mut();
- let mut target_byte_index: usize = 0;
- let mut null_bit_setter = get_null_bit_setter(data_array);
- let null_bit_setter = null_bit_setter.as_mut();
- let all_ones_batch = !0u64;
- let data_array_offset = data_array.offset();
-
- for (i, filter_batch) in filter_u64.iter().enumerate() {
- // foreach u64 batch
- let filter_batch = *filter_batch;
- if filter_batch == 0 {
- // if batch == 0, all items are filtered out, so skip entire batch
- continue;
- } else if filter_batch == all_ones_batch {
- // if batch == all 1s: copy all 64 values in one go
- let data_index = (i * 64) + data_array_offset;
- null_bit_setter.copy_null_bits(data_index, 64);
- let data_byte_index = data_index * value_size;
- let data_len = value_size * 64;
- target_bytes[target_byte_index..(target_byte_index + data_len)]
- .copy_from_slice(
- &data_bytes[data_byte_index..(data_byte_index + data_len)],
- );
- target_byte_index += data_len;
- continue;
- }
- for (j, filter_mask) in filter_mask.iter().enumerate() {
- // foreach bit in batch:
- if (filter_batch & *filter_mask) != 0 {
- let data_index = (i * 64) + j + data_array_offset;
- null_bit_setter.copy_null_bit(data_index);
- // if filter bit == 1: copy data value bytes
- let data_byte_index = data_index * value_size;
- target_bytes[target_byte_index..(target_byte_index +
value_size)]
- .copy_from_slice(
- &data_bytes[data_byte_index..(data_byte_index +
value_size)],
- );
- target_byte_index += value_size;
+ /// iterates over chunks.
+ #[inline]
+ fn iterate_chunks(&mut self) -> Option<(usize, usize)> {
+ while let Some((i, mask)) = self.iter.next() {
+ self.current_chunk = i;
+ if mask == 0 {
+ if self.on_region {
+ let result = (self.start, self.start + self.len);
+ self.len = 0;
+ self.on_region = false;
+ return Some(result);
+ }
+ } else if mask == 18446744073709551615u64 {
+ // = !0u64
+ if !self.on_region {
+ self.start = self.current_start();
+ self.on_region = true;
+ }
+ self.len += 64;
+ } else {
+ // there is a chunk that has a non-trivial mask => iterate
over bits.
+ self.state = State::Bits(mask);
+ return None;
}
}
+ // no more chunks => start iterating over the remainder
+ self.current_chunk = self.chunk_len;
+ self.state = State::Remainder;
+ None
}
-
- let mut array_data_builder = ArrayDataBuilder::new(array_type)
- .len(filtered_count)
- .add_buffer(target_buffer.freeze());
- if null_bit_setter.null_count() > 0 {
- array_data_builder = array_data_builder
- .null_count(null_bit_setter.null_count())
- .null_bit_buffer(null_bit_setter.null_buffer());
- }
-
- Ok(array_data_builder)
-}
-
-/// FilterContext can be used to improve performance when
-/// filtering multiple data arrays with the same filter array.
-#[derive(Debug)]
-pub struct FilterContext {
- filter_u64: Vec<u64>,
- filter_len: usize,
- filtered_count: usize,
- filter_mask: Vec<u64>,
-}
-
-macro_rules! filter_primitive_array {
- ($context:expr, $array:expr, $array_type:ident) => {{
- let input_array =
$array.as_any().downcast_ref::<$array_type>().unwrap();
- let output_array = $context.filter_primitive_array(input_array)?;
- Ok(Arc::new(output_array))
- }};
-}
-
-macro_rules! filter_dictionary_array {
- ($context:expr, $array:expr, $array_type:ident) => {{
- let input_array =
$array.as_any().downcast_ref::<$array_type>().unwrap();
- let output_array = $context.filter_dictionary_array(input_array)?;
- Ok(Arc::new(output_array))
- }};
}
-macro_rules! filter_boolean_item_list_array {
- ($context:expr, $array:expr, $list_type:ident, $list_builder_type:ident)
=> {{
- let input_array =
$array.as_any().downcast_ref::<$list_type>().unwrap();
- let values_builder = BooleanBuilder::new($context.filtered_count);
- let mut builder = $list_builder_type::new(values_builder);
- for i in 0..$context.filter_u64.len() {
- // foreach u64 batch
- let filter_batch = $context.filter_u64[i];
- if filter_batch == 0 {
- // if batch == 0, all items are filtered out, so skip entire
batch
- continue;
- }
- for j in 0..64 {
- // foreach bit in batch:
- if (filter_batch & $context.filter_mask[j]) != 0 {
- let data_index = (i * 64) + j;
- if input_array.is_null(data_index) {
- builder.append(false)?;
- } else {
- let this_inner_list = input_array.value(data_index);
- let inner_list = this_inner_list
- .as_any()
- .downcast_ref::<BooleanArray>()
- .unwrap();
- for k in 0..inner_list.len() {
- if inner_list.is_null(k) {
- builder.values().append_null()?;
- } else {
-
builder.values().append_value(inner_list.value(k))?;
- }
- }
- builder.append(true)?;
+impl<'a> Iterator for SlicesIterator<'a> {
+ type Item = (usize, usize);
+
+ fn next(&mut self) -> Option<Self::Item> {
+ match self.state {
+ State::Chunks => {
+ match self.iterate_chunks() {
+ None => {
+ // iterating over chunks does not yield any new slice
=> continue to the next
+ self.current_bit = 0;
+ self.next()
}
+ other => other,
}
}
- }
- Ok(Arc::new(builder.finish()))
- }};
-}
-
-macro_rules! filter_primitive_item_list_array {
- ($context:expr, $array:expr, $item_type:ident, $list_type:ident,
$list_builder_type:ident) => {{
- let input_array =
$array.as_any().downcast_ref::<$list_type>().unwrap();
- let values_builder =
PrimitiveBuilder::<$item_type>::new($context.filtered_count);
- let mut builder = $list_builder_type::new(values_builder);
- for i in 0..$context.filter_u64.len() {
- // foreach u64 batch
- let filter_batch = $context.filter_u64[i];
- if filter_batch == 0 {
- // if batch == 0, all items are filtered out, so skip entire
batch
- continue;
- }
- for j in 0..64 {
- // foreach bit in batch:
- if (filter_batch & $context.filter_mask[j]) != 0 {
- let data_index = (i * 64) + j;
- if input_array.is_null(data_index) {
- builder.append(false)?;
- } else {
- let this_inner_list = input_array.value(data_index);
- let inner_list = this_inner_list
- .as_any()
- .downcast_ref::<PrimitiveArray<$item_type>>()
- .unwrap();
- for k in 0..inner_list.len() {
- if inner_list.is_null(k) {
- builder.values().append_null()?;
- } else {
-
builder.values().append_value(inner_list.value(k))?;
- }
- }
- builder.append(true)?;
+ State::Bits(mask) => {
+ match self.iterate_bits(mask, 64) {
+ None => {
+ // iterating over bits does not yield any new slice =>
change back
+ // to chunks and continue to the next
+ self.state = State::Chunks;
+ self.next()
}
+ other => other,
}
}
- }
- Ok(Arc::new(builder.finish()))
- }};
-}
-
-macro_rules! filter_non_primitive_item_list_array {
- ($context:expr, $array:expr, $item_array_type:ident, $item_builder:ident,
$list_type:ident, $list_builder_type:ident) => {{
- let input_array =
$array.as_any().downcast_ref::<$list_type>().unwrap();
- let values_builder = $item_builder::new($context.filtered_count);
- let mut builder = $list_builder_type::new(values_builder);
- for i in 0..$context.filter_u64.len() {
- // foreach u64 batch
- let filter_batch = $context.filter_u64[i];
- if filter_batch == 0 {
- // if batch == 0, all items are filtered out, so skip entire
batch
- continue;
- }
- for j in 0..64 {
- // foreach bit in batch:
- if (filter_batch & $context.filter_mask[j]) != 0 {
- let data_index = (i * 64) + j;
- if input_array.is_null(data_index) {
- builder.append(false)?;
- } else {
- let this_inner_list = input_array.value(data_index);
- let inner_list = this_inner_list
- .as_any()
- .downcast_ref::<$item_array_type>()
- .unwrap();
- for k in 0..inner_list.len() {
- if inner_list.is_null(k) {
- builder.values().append_null()?;
- } else {
-
builder.values().append_value(inner_list.value(k))?;
- }
+ State::Remainder => {
+ match self.iterate_bits(self.remainder_mask,
self.remainder_len) {
+ None => {
+ self.state = State::Finish;
+ if self.on_region {
+ Some((self.start, self.start + self.len))
+ } else {
+ None
}
- builder.append(true)?;
}
+ other => other,
}
}
+ State::Finish => None,
}
- Ok(Arc::new(builder.finish()))
- }};
-}
-
-impl FilterContext {
- /// Returns a new instance of FilterContext
- pub fn new(filter_array: &BooleanArray) -> Result<Self> {
- if filter_array.offset() > 0 {
- return Err(ArrowError::ComputeError(
- "Filter array cannot have offset > 0".to_string(),
- ));
- }
- let filter_mask: Vec<u64> = (0..64).map(|x| 1u64 << x).collect();
- let filter_buffer = &filter_array.data_ref().buffers()[0];
- let filtered_count = filter_buffer.count_set_bits_offset(0,
filter_array.len());
-
- let filter_bytes = filter_buffer.data();
-
- // add to the resulting len so is is a multiple of the size of u64
- let pad_addional_len = 8 - filter_bytes.len() % 8;
-
- // transmute filter_bytes to &[u64]
- let mut u64_buffer = MutableBuffer::new(filter_bytes.len() +
pad_addional_len);
-
- u64_buffer.extend_from_slice(filter_bytes);
- u64_buffer.extend_from_slice(&vec![0; pad_addional_len]);
- let mut filter_u64 = u64_buffer.typed_data_mut::<u64>().to_owned();
-
- // mask of any bits outside of the given len
- if filter_array.len() % 64 != 0 {
- let last_idx = filter_u64.len() - 1;
- let mask = u64::MAX >> (64 - filter_array.len() % 64);
- filter_u64[last_idx] &= mask;
- }
-
- Ok(FilterContext {
- filter_u64,
- filter_len: filter_array.len(),
- filtered_count,
- filter_mask,
- })
- }
-
- /// Returns a new array, containing only the elements matching the filter
- pub fn filter(&self, array: &Array) -> Result<ArrayRef> {
- match array.data_type() {
- DataType::UInt8 => filter_primitive_array!(self, array,
UInt8Array),
- DataType::UInt16 => filter_primitive_array!(self, array,
UInt16Array),
- DataType::UInt32 => filter_primitive_array!(self, array,
UInt32Array),
- DataType::UInt64 => filter_primitive_array!(self, array,
UInt64Array),
- DataType::Int8 => filter_primitive_array!(self, array, Int8Array),
- DataType::Int16 => filter_primitive_array!(self, array,
Int16Array),
- DataType::Int32 => filter_primitive_array!(self, array,
Int32Array),
- DataType::Int64 => filter_primitive_array!(self, array,
Int64Array),
- DataType::Float32 => filter_primitive_array!(self, array,
Float32Array),
- DataType::Float64 => filter_primitive_array!(self, array,
Float64Array),
- DataType::Boolean => {
- let input_array =
array.as_any().downcast_ref::<BooleanArray>().unwrap();
- let mut builder = BooleanArray::builder(self.filtered_count);
- for i in 0..self.filter_u64.len() {
- // foreach u64 batch
- let filter_batch = self.filter_u64[i];
- if filter_batch == 0 {
- // if batch == 0, all items are filtered out, so skip
entire batch
- continue;
- }
- for j in 0..64 {
- // foreach bit in batch:
- if (filter_batch & self.filter_mask[j]) != 0 {
- let data_index = (i * 64) + j;
- if input_array.is_null(data_index) {
- builder.append_null()?;
- } else {
-
builder.append_value(input_array.value(data_index))?;
- }
- }
- }
- }
- Ok(Arc::new(builder.finish()))
- },
- DataType::Date32(_) => filter_primitive_array!(self, array,
Date32Array),
- DataType::Date64(_) => filter_primitive_array!(self, array,
Date64Array),
- DataType::Time32(TimeUnit::Second) => {
- filter_primitive_array!(self, array, Time32SecondArray)
- }
- DataType::Time32(TimeUnit::Millisecond) => {
- filter_primitive_array!(self, array, Time32MillisecondArray)
- }
- DataType::Time64(TimeUnit::Microsecond) => {
- filter_primitive_array!(self, array, Time64MicrosecondArray)
- }
- DataType::Time64(TimeUnit::Nanosecond) => {
- filter_primitive_array!(self, array, Time64NanosecondArray)
- }
- DataType::Duration(TimeUnit::Second) => {
- filter_primitive_array!(self, array, DurationSecondArray)
- }
- DataType::Duration(TimeUnit::Millisecond) => {
- filter_primitive_array!(self, array, DurationMillisecondArray)
- }
- DataType::Duration(TimeUnit::Microsecond) => {
- filter_primitive_array!(self, array, DurationMicrosecondArray)
- }
- DataType::Duration(TimeUnit::Nanosecond) => {
- filter_primitive_array!(self, array, DurationNanosecondArray)
- }
- DataType::Timestamp(TimeUnit::Second, _) => {
- filter_primitive_array!(self, array, TimestampSecondArray)
- }
- DataType::Timestamp(TimeUnit::Millisecond, _) => {
- filter_primitive_array!(self, array, TimestampMillisecondArray)
- }
- DataType::Timestamp(TimeUnit::Microsecond, _) => {
- filter_primitive_array!(self, array, TimestampMicrosecondArray)
- }
- DataType::Timestamp(TimeUnit::Nanosecond, _) => {
- filter_primitive_array!(self, array, TimestampNanosecondArray)
- }
- DataType::Binary => {
- let input_array =
array.as_any().downcast_ref::<BinaryArray>().unwrap();
- let mut values: Vec<Option<&[u8]>> =
Vec::with_capacity(self.filtered_count);
- for i in 0..self.filter_u64.len() {
- // foreach u64 batch
- let filter_batch = self.filter_u64[i];
- if filter_batch == 0 {
- // if batch == 0, all items are filtered out, so skip
entire batch
- continue;
- }
- for j in 0..64 {
- // foreach bit in batch:
- if (filter_batch & self.filter_mask[j]) != 0 {
- let data_index = (i * 64) + j;
- if input_array.is_null(data_index) {
- values.push(None)
- } else {
-
values.push(Some(input_array.value(data_index)))
- }
- }
- }
- }
- Ok(Arc::new(BinaryArray::from(values)))
- }
- DataType::Utf8 => {
- let input_array =
array.as_any().downcast_ref::<StringArray>().unwrap();
- let mut values: Vec<Option<&str>> =
Vec::with_capacity(self.filtered_count);
- for i in 0..self.filter_u64.len() {
- // foreach u64 batch
- let filter_batch = self.filter_u64[i];
- if filter_batch == 0 {
- // if batch == 0, all items are filtered out, so skip
entire batch
- continue;
- }
- for j in 0..64 {
- // foreach bit in batch:
- if (filter_batch & self.filter_mask[j]) != 0 {
- let data_index = (i * 64) + j;
- if input_array.is_null(data_index) {
- values.push(None)
- } else {
-
values.push(Some(input_array.value(data_index)))
- }
- }
- }
- }
- Ok(Arc::new(StringArray::from(values)))
- }
- DataType::Dictionary(ref key_type, ref value_type) => match
(key_type.as_ref(), value_type.as_ref()) {
- (key_type, DataType::Utf8) => match key_type {
- DataType::UInt8 => filter_dictionary_array!(self, array,
UInt8DictionaryArray),
- DataType::UInt16 => filter_dictionary_array!(self, array,
UInt16DictionaryArray),
- DataType::UInt32 => filter_dictionary_array!(self, array,
UInt32DictionaryArray),
- DataType::UInt64 => filter_dictionary_array!(self, array,
UInt64DictionaryArray),
- DataType::Int8 => filter_dictionary_array!(self, array,
Int8DictionaryArray),
- DataType::Int16 => filter_dictionary_array!(self, array,
Int16DictionaryArray),
- DataType::Int32 => filter_dictionary_array!(self, array,
Int32DictionaryArray),
- DataType::Int64 => filter_dictionary_array!(self, array,
Int64DictionaryArray),
- other => Err(ArrowError::ComputeError(format!(
- "filter not supported for string dictionary with key
of type {:?}",
- other
- )))
- }
- (key_type, value_type) => Err(ArrowError::ComputeError(format!(
- "filter not supported for Dictionary({:?}, {:?})",
- key_type, value_type
- )))
- }
- DataType::List(dt) => match dt.data_type() {
- DataType::UInt8 => {
- filter_primitive_item_list_array!(self, array, UInt8Type,
ListArray, ListBuilder)
- }
- DataType::UInt16 => {
- filter_primitive_item_list_array!(self, array, UInt16Type,
ListArray, ListBuilder)
- }
- DataType::UInt32 => {
- filter_primitive_item_list_array!(self, array, UInt32Type,
ListArray, ListBuilder)
- }
- DataType::UInt64 => {
- filter_primitive_item_list_array!(self, array, UInt64Type,
ListArray, ListBuilder)
- }
- DataType::Int8 => filter_primitive_item_list_array!(self,
array, Int8Type, ListArray, ListBuilder),
- DataType::Int16 => {
- filter_primitive_item_list_array!(self, array, Int16Type,
ListArray, ListBuilder)
- }
- DataType::Int32 => {
- filter_primitive_item_list_array!(self, array, Int32Type,
ListArray, ListBuilder)
- }
- DataType::Int64 => {
- filter_primitive_item_list_array!(self, array, Int64Type,
ListArray, ListBuilder)
- }
- DataType::Float32 => {
- filter_primitive_item_list_array!(self, array,
Float32Type, ListArray, ListBuilder)
- }
- DataType::Float64 => {
- filter_primitive_item_list_array!(self, array,
Float64Type, ListArray, ListBuilder)
- }
- DataType::Boolean => {
- filter_boolean_item_list_array!(self, array, ListArray,
ListBuilder)
- }
- DataType::Date32(_) => {
- filter_primitive_item_list_array!(self, array, Date32Type,
ListArray, ListBuilder)
- }
- DataType::Date64(_) => {
- filter_primitive_item_list_array!(self, array, Date64Type,
ListArray, ListBuilder)
- }
- DataType::Time32(TimeUnit::Second) => {
- filter_primitive_item_list_array!(self, array,
Time32SecondType, ListArray, ListBuilder)
- }
- DataType::Time32(TimeUnit::Millisecond) => {
- filter_primitive_item_list_array!(self, array,
Time32MillisecondType, ListArray, ListBuilder)
- }
- DataType::Time64(TimeUnit::Microsecond) => {
- filter_primitive_item_list_array!(self, array,
Time64MicrosecondType, ListArray, ListBuilder)
- }
- DataType::Time64(TimeUnit::Nanosecond) => {
- filter_primitive_item_list_array!(self, array,
Time64NanosecondType, ListArray, ListBuilder)
- }
- DataType::Duration(TimeUnit::Second) => {
- filter_primitive_item_list_array!(self, array,
DurationSecondType, ListArray, ListBuilder)
- }
- DataType::Duration(TimeUnit::Millisecond) => {
- filter_primitive_item_list_array!(self, array,
DurationMillisecondType, ListArray, ListBuilder)
- }
- DataType::Duration(TimeUnit::Microsecond) => {
- filter_primitive_item_list_array!(self, array,
DurationMicrosecondType, ListArray, ListBuilder)
- }
- DataType::Duration(TimeUnit::Nanosecond) => {
- filter_primitive_item_list_array!(self, array,
DurationNanosecondType, ListArray, ListBuilder)
- }
- DataType::Timestamp(TimeUnit::Second, _) => {
- filter_primitive_item_list_array!(self, array,
TimestampSecondType, ListArray, ListBuilder)
- }
- DataType::Timestamp(TimeUnit::Millisecond, _) => {
- filter_primitive_item_list_array!(self, array,
TimestampMillisecondType, ListArray, ListBuilder)
- }
- DataType::Timestamp(TimeUnit::Microsecond, _) => {
- filter_primitive_item_list_array!(self, array,
TimestampMicrosecondType, ListArray, ListBuilder)
- }
- DataType::Timestamp(TimeUnit::Nanosecond, _) => {
- filter_primitive_item_list_array!(self, array,
TimestampNanosecondType, ListArray, ListBuilder)
- }
- DataType::Binary => filter_non_primitive_item_list_array!(
- self,
- array,
- BinaryArray,
- BinaryBuilder,
- ListArray,
- ListBuilder
- ),
- DataType::LargeBinary => filter_non_primitive_item_list_array!(
- self,
- array,
- LargeBinaryArray,
- LargeBinaryBuilder,
- ListArray,
- ListBuilder
- ),
- DataType::Utf8 => filter_non_primitive_item_list_array!(
- self,
- array,
- StringArray,
- StringBuilder,
- ListArray
- ,ListBuilder
- ),
- DataType::LargeUtf8 => filter_non_primitive_item_list_array!(
- self,
- array,
- LargeStringArray,
- LargeStringBuilder,
- ListArray,
- ListBuilder
- ),
- other => {
- Err(ArrowError::ComputeError(format!(
- "filter not supported for List({:?})",
- other
- )))
- }
- }
- DataType::LargeList(dt) => match dt.data_type() {
- DataType::UInt8 => {
- filter_primitive_item_list_array!(self, array, UInt8Type,
LargeListArray, LargeListBuilder)
- }
- DataType::UInt16 => {
- filter_primitive_item_list_array!(self, array, UInt16Type,
LargeListArray, LargeListBuilder)
- }
- DataType::UInt32 => {
- filter_primitive_item_list_array!(self, array, UInt32Type,
LargeListArray, LargeListBuilder)
- }
- DataType::UInt64 => {
- filter_primitive_item_list_array!(self, array, UInt64Type,
LargeListArray, LargeListBuilder)
- }
- DataType::Int8 => filter_primitive_item_list_array!(self,
array, Int8Type, LargeListArray, LargeListBuilder),
- DataType::Int16 => {
- filter_primitive_item_list_array!(self, array, Int16Type,
LargeListArray, LargeListBuilder)
- }
- DataType::Int32 => {
- filter_primitive_item_list_array!(self, array, Int32Type,
LargeListArray, LargeListBuilder)
- }
- DataType::Int64 => {
- filter_primitive_item_list_array!(self, array, Int64Type,
LargeListArray, LargeListBuilder)
- }
- DataType::Float32 => {
- filter_primitive_item_list_array!(self, array,
Float32Type, LargeListArray, LargeListBuilder)
- }
- DataType::Float64 => {
- filter_primitive_item_list_array!(self, array,
Float64Type, LargeListArray, LargeListBuilder)
- }
- DataType::Boolean => {
- filter_boolean_item_list_array!(self, array,
LargeListArray, LargeListBuilder)
- }
- DataType::Date32(_) => {
- filter_primitive_item_list_array!(self, array, Date32Type,
LargeListArray, LargeListBuilder)
- }
- DataType::Date64(_) => {
- filter_primitive_item_list_array!(self, array, Date64Type,
LargeListArray, LargeListBuilder)
- }
- DataType::Time32(TimeUnit::Second) => {
- filter_primitive_item_list_array!(self, array,
Time32SecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Time32(TimeUnit::Millisecond) => {
- filter_primitive_item_list_array!(self, array,
Time32MillisecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Time64(TimeUnit::Microsecond) => {
- filter_primitive_item_list_array!(self, array,
Time64MicrosecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Time64(TimeUnit::Nanosecond) => {
- filter_primitive_item_list_array!(self, array,
Time64NanosecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Duration(TimeUnit::Second) => {
- filter_primitive_item_list_array!(self, array,
DurationSecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Duration(TimeUnit::Millisecond) => {
- filter_primitive_item_list_array!(self, array,
DurationMillisecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Duration(TimeUnit::Microsecond) => {
- filter_primitive_item_list_array!(self, array,
DurationMicrosecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Duration(TimeUnit::Nanosecond) => {
- filter_primitive_item_list_array!(self, array,
DurationNanosecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Timestamp(TimeUnit::Second, _) => {
- filter_primitive_item_list_array!(self, array,
TimestampSecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Timestamp(TimeUnit::Millisecond, _) => {
- filter_primitive_item_list_array!(self, array,
TimestampMillisecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Timestamp(TimeUnit::Microsecond, _) => {
- filter_primitive_item_list_array!(self, array,
TimestampMicrosecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Timestamp(TimeUnit::Nanosecond, _) => {
- filter_primitive_item_list_array!(self, array,
TimestampNanosecondType, LargeListArray, LargeListBuilder)
- }
- DataType::Binary => filter_non_primitive_item_list_array!(
- self,
- array,
- BinaryArray,
- BinaryBuilder,
- LargeListArray,
- LargeListBuilder
- ),
- DataType::LargeBinary => filter_non_primitive_item_list_array!(
- self,
- array,
- LargeBinaryArray,
- LargeBinaryBuilder,
- LargeListArray,
- LargeListBuilder
- ),
- DataType::Utf8 => filter_non_primitive_item_list_array!(
- self,
- array,
- StringArray,
- StringBuilder,
- LargeListArray,
- LargeListBuilder
- ),
- DataType::LargeUtf8 => filter_non_primitive_item_list_array!(
- self,
- array,
- LargeStringArray,
- LargeStringBuilder,
- LargeListArray,
- LargeListBuilder
- ),
- other => {
- Err(ArrowError::ComputeError(format!(
- "filter not supported for LargeList({:?})",
- other
- )))
- }
- }
- other => Err(ArrowError::ComputeError(format!(
- "filter not supported for {:?}",
- other
- ))),
- }
- }
-
- /// Returns a new PrimitiveArray<T> containing only those values from the
array passed as the data_array parameter,
- /// selected by the BooleanArray passed as the filter_array parameter
- pub fn filter_primitive_array<T>(
- &self,
- data_array: &PrimitiveArray<T>,
- ) -> Result<PrimitiveArray<T>>
- where
- T: ArrowNumericType,
- {
- let array_type = T::DATA_TYPE;
- let value_size = mem::size_of::<T::Native>();
- let array_data_builder =
- filter_array_impl(self, data_array, array_type, value_size)?;
- let data = array_data_builder.build();
- Ok(PrimitiveArray::<T>::from(data))
- }
-
- /// Returns a new DictionaryArray<T> containing only those keys from the
array passed as the data_array parameter,
- /// selected by the BooleanArray passed as the filter_array parameter. The
values are cloned from the data_array.
- pub fn filter_dictionary_array<T>(
- &self,
- data_array: &DictionaryArray<T>,
- ) -> Result<DictionaryArray<T>>
- where
- T: ArrowNumericType,
- {
- let array_type = data_array.data_type().clone();
- let value_size = mem::size_of::<T::Native>();
- let mut array_data_builder =
- filter_array_impl(self, data_array, array_type, value_size)?;
- // copy dictionary values from input array
- array_data_builder =
- array_data_builder.add_child_data(data_array.values().data());
- let data = array_data_builder.build();
- Ok(DictionaryArray::<T>::from(data))
}
}
-/// Returns a new array, containing only the elements matching the filter.
-pub fn filter(array: &Array, filter: &BooleanArray) -> Result<ArrayRef> {
- FilterContext::new(filter)?.filter(array)
+/// Returns a function used to filter arbitrary arrays.
+/// This is faster (2x for primitive types) than using [filter] on multiple
arrays, but slower
+/// than [filter] when filtering a single array.
+pub fn build_filter(filter: &BooleanArray) -> Result<Filter> {
+ let iter = SlicesIterator::new(filter);
+ let filter_count = iter.filter_count;
+ let chunks = iter.collect::<Vec<_>>();
+
+ Ok(Box::new(move |array: &ArrayData| {
+ let mut mutable = MutableArrayData::new(vec![array], false,
filter_count);
+ chunks
+ .iter()
+ .for_each(|(start, end)| mutable.extend(0, *start, *end));
+ mutable.freeze()
+ }))
}
-/// Returns a new PrimitiveArray<T> containing only those values from the
array passed as the data_array parameter,
-/// selected by the BooleanArray passed as the filter_array parameter
-pub fn filter_primitive_array<T>(
- data_array: &PrimitiveArray<T>,
- filter_array: &BooleanArray,
-) -> Result<PrimitiveArray<T>>
-where
- T: ArrowNumericType,
-{
- FilterContext::new(filter_array)?.filter_primitive_array(data_array)
-}
+/// Filters an [Array], returning elements matching the filter (i.e. where the
values are true).
+/// WARNING: the nulls of `filter` are ignored and the value on its slot is
considered.
Review comment:
I think this WARNING should also be included in the doc comments of
`build_filter` as well
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]