tustvold commented on a change in pull request #1248:
URL: https://github.com/apache/arrow-rs/pull/1248#discussion_r798380248
##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -185,68 +267,573 @@ pub fn prep_null_mask_filter(filter: &BooleanArray) ->
BooleanArray {
/// # Ok(())
/// # }
/// ```
-pub fn filter(array: &dyn Array, predicate: &BooleanArray) -> Result<ArrayRef>
{
- if predicate.null_count() > 0 {
- // this greatly simplifies subsequent filtering code
- // now we only have a boolean mask to deal with
- let predicate = prep_null_mask_filter(predicate);
- return filter(array, &predicate);
+pub fn filter(values: &dyn Array, predicate: &BooleanArray) ->
Result<ArrayRef> {
+ let predicate = FilterBuilder::new(predicate).build();
+ filter_array(values, &predicate)
+}
+
+/// Returns a new [RecordBatch] with arrays containing only values matching
the filter.
+pub fn filter_record_batch(
+ record_batch: &RecordBatch,
+ predicate: &BooleanArray,
+) -> Result<RecordBatch> {
+ let filter = FilterBuilder::new(predicate).optimize().build();
+
+ let filtered_arrays = record_batch
+ .columns()
+ .iter()
+ .map(|a| filter_array(a, &filter))
+ .collect::<Result<Vec<_>>>()?;
+
+ RecordBatch::try_new(record_batch.schema(), filtered_arrays)
+}
+
+/// A builder to construct [`FilterPredicate`]
+#[derive(Debug)]
+pub struct FilterBuilder {
+ filter: BooleanArray,
+ count: usize,
+ iterator: FilterIterator,
+}
+
+impl FilterBuilder {
+ /// Create a new [`FilterBuilder`] that can be used to construct a
[`FilterPredicate`]
+ pub fn new(filter: &BooleanArray) -> Self {
+ let filter = match filter.null_count() {
+ 0 => BooleanArray::from(filter.data().clone()),
+ _ => prep_null_mask_filter(filter),
+ };
+
+ let count = filter_count(&filter);
+ let selectivity_frac = count as f64 / filter.len() as f64;
+ let iterator = if selectivity_frac > 0.8 {
+ FilterIterator::SlicesIterator
+ } else {
+ FilterIterator::IndexIterator
+ };
+
+ Self {
+ filter,
+ count,
+ iterator,
+ }
+ }
+
+ /// Compute an optimised representation of the provided `filter` mask that
can be
+ /// applied to an array more quickly.
+ ///
+ /// Note: There is limited benefit to calling this to then filter a single
array
+ /// Note: This will likely have a larger memory footprint than the
original mask
+ pub fn optimize(mut self) -> Self {
+ match self.iterator {
+ FilterIterator::SlicesIterator => {
+ let slices = SlicesIterator::new(&self.filter).collect();
+ self.iterator = FilterIterator::Slices(slices)
+ }
+ FilterIterator::IndexIterator => {
+ let indices = IndexIterator::new(&self.filter).collect();
+ self.iterator = FilterIterator::Indices(indices)
+ }
+ _ => {}
+ }
+ self
+ }
+
+ /// Construct the final `FilterPredicate`
+ pub fn build(self) -> FilterPredicate {
+ FilterPredicate {
+ filter: self.filter,
+ count: self.count,
+ iterator: self.iterator,
+ }
+ }
+}
+
+/// The internal iterator type of [`FilterPredicate`]
+#[derive(Debug)]
+enum FilterIterator {
+ // A lazily evaluated iterator of ranges
+ SlicesIterator,
+ // A lazily evaluated iterator of indices
+ IndexIterator,
+ // A precomputed list of indices
+ Indices(Vec<usize>),
+ // A precomputed array of ranges
+ Slices(Vec<(usize, usize)>),
+}
+
+/// A filtering predicate that can be applied to an [`Array`]
+#[derive(Debug)]
+pub struct FilterPredicate {
+ filter: BooleanArray,
+ count: usize,
+ iterator: FilterIterator,
+}
+
+impl FilterPredicate {
+ /// Selects rows from `values` based on this [`FilterPredicate`]
+ pub fn filter(&self, values: &dyn Array) -> Result<ArrayRef> {
+ filter_array(values, self)
}
+}
- let filter_count = filter_count(predicate);
+fn filter_array(values: &dyn Array, predicate: &FilterPredicate) ->
Result<ArrayRef> {
+ if predicate.filter.len() > values.len() {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Filter predicate of length {} is larger than target array of
length {}",
+ predicate.filter.len(),
+ values.len()
+ )));
+ }
- match filter_count {
+ match predicate.count {
0 => {
// return empty
- Ok(new_empty_array(array.data_type()))
+ Ok(new_empty_array(values.data_type()))
}
- len if len == array.len() => {
+ len if len == values.len() => {
// return all
- let data = array.data().clone();
+ let data = values.data().clone();
Ok(make_array(data))
}
- _ => {
- // actually filter
- let mut mutable =
- MutableArrayData::new(vec![array.data_ref()], false,
filter_count);
+ // actually filter
+ _ => match values.data_type() {
+ DataType::Boolean => {
+ let values =
values.as_any().downcast_ref::<BooleanArray>().unwrap();
+ Ok(Arc::new(filter_boolean(values, predicate)))
+ }
+ DataType::Int8 => {
+ downcast_filter!(Int8Type, values, predicate)
+ }
+ DataType::Int16 => {
+ downcast_filter!(Int16Type, values, predicate)
+ }
+ DataType::Int32 => {
+ downcast_filter!(Int32Type, values, predicate)
+ }
+ DataType::Int64 => {
+ downcast_filter!(Int64Type, values, predicate)
+ }
+ DataType::UInt8 => {
+ downcast_filter!(UInt8Type, values, predicate)
+ }
+ DataType::UInt16 => {
+ downcast_filter!(UInt16Type, values, predicate)
+ }
+ DataType::UInt32 => {
+ downcast_filter!(UInt32Type, values, predicate)
+ }
+ DataType::UInt64 => {
+ downcast_filter!(UInt64Type, values, predicate)
+ }
+ DataType::Float32 => {
+ downcast_filter!(Float32Type, values, predicate)
+ }
+ DataType::Float64 => {
+ downcast_filter!(Float64Type, values, predicate)
+ }
+ DataType::Date32 => {
+ downcast_filter!(Date32Type, values, predicate)
+ }
+ DataType::Date64 => {
+ downcast_filter!(Date64Type, values, predicate)
+ }
+ DataType::Time32(Second) => {
+ downcast_filter!(Time32SecondType, values, predicate)
+ }
+ DataType::Time32(Millisecond) => {
+ downcast_filter!(Time32MillisecondType, values, predicate)
+ }
+ DataType::Time64(Microsecond) => {
+ downcast_filter!(Time64MicrosecondType, values, predicate)
+ }
+ DataType::Time64(Nanosecond) => {
+ downcast_filter!(Time64NanosecondType, values, predicate)
+ }
+ DataType::Timestamp(Second, _) => {
+ downcast_filter!(TimestampSecondType, values, predicate)
+ }
+ DataType::Timestamp(Millisecond, _) => {
+ downcast_filter!(TimestampMillisecondType, values, predicate)
+ }
+ DataType::Timestamp(Microsecond, _) => {
+ downcast_filter!(TimestampMicrosecondType, values, predicate)
+ }
+ DataType::Timestamp(Nanosecond, _) => {
+ downcast_filter!(TimestampNanosecondType, values, predicate)
+ }
+ DataType::Interval(IntervalUnit::YearMonth) => {
+ downcast_filter!(IntervalYearMonthType, values, predicate)
+ }
+ DataType::Interval(IntervalUnit::DayTime) => {
+ downcast_filter!(IntervalDayTimeType, values, predicate)
+ }
+ DataType::Interval(IntervalUnit::MonthDayNano) => {
+ downcast_filter!(IntervalMonthDayNanoType, values, predicate)
+ }
+ DataType::Duration(TimeUnit::Second) => {
+ downcast_filter!(DurationSecondType, values, predicate)
+ }
+ DataType::Duration(TimeUnit::Millisecond) => {
+ downcast_filter!(DurationMillisecondType, values, predicate)
+ }
+ DataType::Duration(TimeUnit::Microsecond) => {
+ downcast_filter!(DurationMicrosecondType, values, predicate)
+ }
+ DataType::Duration(TimeUnit::Nanosecond) => {
+ downcast_filter!(DurationNanosecondType, values, predicate)
+ }
+ DataType::Utf8 => {
+ let values = values
+ .as_any()
+ .downcast_ref::<GenericStringArray<i32>>()
+ .unwrap();
+ Ok(Arc::new(filter_string::<i32>(values, predicate)))
+ }
+ DataType::LargeUtf8 => {
+ let values = values
+ .as_any()
+ .downcast_ref::<GenericStringArray<i64>>()
+ .unwrap();
+ Ok(Arc::new(filter_string::<i64>(values, predicate)))
+ }
+ DataType::Dictionary(key_type, _) => match key_type.as_ref() {
+ DataType::Int8 => downcast_dict_filter!(Int8Type, values,
predicate),
+ DataType::Int16 => downcast_dict_filter!(Int16Type, values,
predicate),
+ DataType::Int32 => downcast_dict_filter!(Int32Type, values,
predicate),
+ DataType::Int64 => downcast_dict_filter!(Int64Type, values,
predicate),
+ DataType::UInt8 => downcast_dict_filter!(UInt8Type, values,
predicate),
+ DataType::UInt16 => downcast_dict_filter!(UInt16Type, values,
predicate),
+ DataType::UInt32 => downcast_dict_filter!(UInt32Type, values,
predicate),
+ DataType::UInt64 => downcast_dict_filter!(UInt64Type, values,
predicate),
+ t => unimplemented!("Take not supported for dictionary key
type {:?}", t),
+ },
+ _ => {
+ // fallback to using MutableArrayData
+ let mut mutable = MutableArrayData::new(
+ vec![values.data_ref()],
+ false,
+ predicate.count,
+ );
+
+ let iter = SlicesIterator::new(&predicate.filter);
+ iter.for_each(|(start, end)| mutable.extend(0, start, end));
+
+ let data = mutable.freeze();
+ Ok(make_array(data))
+ }
+ },
+ }
+}
+
+/// Computes a new null mask for `data` based on `predicate`
+///
+/// Returns `None` if no nulls in the result
+fn filter_null_mask(
+ data: &ArrayData,
+ predicate: &FilterPredicate,
+) -> Option<(usize, Buffer)> {
+ if data.null_count() == 0 {
+ return None;
+ }
- let iter = SlicesIterator::new(predicate);
- iter.for_each(|(start, end)| mutable.extend(0, start, end));
+ let nulls = filter_bits(data.null_buffer()?, data.offset(), predicate);
+ let null_count = predicate.count - nulls.count_set_bits();
- let data = mutable.freeze();
- Ok(make_array(data))
+ if null_count == 0 {
+ return None;
+ }
+
+ Some((null_count, nulls))
+}
+
+/// Filter the packed bitmask `buffer`, with `predicate` starting at bit
offset `offset`
+fn filter_bits(buffer: &Buffer, offset: usize, predicate: &FilterPredicate) ->
Buffer {
+ let src = buffer.as_slice();
+
+ match &predicate.iterator {
+ FilterIterator::IndexIterator => {
+ let bits = IndexIterator::new(&predicate.filter)
+ .map(|src_idx| bit_util::get_bit(src, src_idx + offset));
+ unsafe { collect_filtered_bits(bits, predicate.count) }
+ }
+ FilterIterator::Indices(indices) => {
+ let bits = indices
+ .iter()
+ .map(|src_idx| bit_util::get_bit(src, *src_idx + offset));
+ unsafe { collect_filtered_bits(bits, predicate.count) }
+ }
+ FilterIterator::SlicesIterator => {
+ let mut builder =
+ BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8));
+ for (start, end) in SlicesIterator::new(&predicate.filter) {
+ builder.append_packed_range(start..end, src)
+ }
+ builder.finish()
+ }
+ FilterIterator::Slices(slices) => {
+ let mut builder =
+ BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8));
+ for (start, end) in slices {
+ builder.append_packed_range(*start..*end, src)
+ }
+ builder.finish()
}
}
}
-/// Returns a new [RecordBatch] with arrays containing only values matching
the filter.
-pub fn filter_record_batch(
- record_batch: &RecordBatch,
- predicate: &BooleanArray,
-) -> Result<RecordBatch> {
- if predicate.null_count() > 0 {
- // this greatly simplifies subsequent filtering code
- // now we only have a boolean mask to deal with
- let predicate = prep_null_mask_filter(predicate);
- return filter_record_batch(record_batch, &predicate);
+/// Collects the provided boolean iterator into a `Buffer`
+///
+/// Largely copied from `MutableBuffer::from_trusted_len_iter_bool`
+///
+/// TODO: DRY this up
+unsafe fn collect_filtered_bits(
+ mut src: impl Iterator<Item = bool>,
+ bit_len: usize,
+) -> Buffer {
+ let aligned_len = bit_util::ceil(bit_len, 64) * 8;
+ let mut buffer = MutableBuffer::new(aligned_len);
+
+ 'a: loop {
+ let mut byte_accum: u64 = 0;
+ let mut mask: u64 = 1;
+
+ while mask != 0 {
+ if let Some(value) = src.next() {
+ byte_accum |= match value {
+ true => mask,
+ false => 0,
+ };
+ mask <<= 1;
+ } else {
+ if mask != 1 {
+ buffer.push_unchecked(byte_accum);
+ }
+ break 'a;
+ }
+ }
+ buffer.push_unchecked(byte_accum);
}
- let num_columns = record_batch.columns().len();
+ // Truncate to byte length - technically not necessary but cannot hurt
+ buffer.resize(bit_util::ceil(bit_len, 8), 0);
+ buffer.into()
+}
+
+/// `filter` implementation for boolean buffers
+fn filter_boolean(values: &BooleanArray, predicate: &FilterPredicate) ->
BooleanArray {
+ let data = values.data();
+ assert_eq!(data.buffers().len(), 1);
+ assert_eq!(data.child_data().len(), 0);
+
+ let values = filter_bits(&data.buffers()[0], data.offset(), predicate);
+
+ let mut builder = ArrayDataBuilder::new(DataType::Boolean)
+ .len(predicate.count)
+ .add_buffer(values);
- let filtered_arrays = match num_columns {
- 1 => {
- vec![filter(record_batch.columns()[0].as_ref(), predicate)?]
+ if let Some((null_count, nulls)) = filter_null_mask(data, predicate) {
+ builder = builder.null_count(null_count).null_bit_buffer(nulls);
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ BooleanArray::from(data)
+}
+
+/// `filter` implementation for primitive arrays
+fn filter_primitive<T>(
+ values: &PrimitiveArray<T>,
+ predicate: &FilterPredicate,
+) -> PrimitiveArray<T>
+where
+ T: ArrowPrimitiveType,
+{
+ let data = values.data();
+ assert_eq!(data.buffers().len(), 1);
+ assert_eq!(data.child_data().len(), 0);
+
+ let values = data.buffer::<T::Native>(0);
+ assert!(values.len() >= predicate.filter.len());
+
+ let mut buffer = MutableBuffer::with_capacity(predicate.count *
T::get_byte_width());
+
+ match &predicate.iterator {
+ FilterIterator::SlicesIterator => {
+ for (start, end) in SlicesIterator::new(&predicate.filter) {
+ buffer.extend_from_slice(&values[start..end]);
+ }
}
- _ => {
- let filter = build_filter(predicate)?;
- record_batch
- .columns()
- .iter()
- .map(|a| make_array(filter(a.data())))
- .collect()
+ FilterIterator::Slices(slices) => {
+ for (start, end) in slices {
+ buffer.extend_from_slice(&values[*start..*end]);
+ }
}
+ FilterIterator::IndexIterator => {
+ for idx in IndexIterator::new(&predicate.filter) {
+ unsafe { buffer.push_unchecked(values[idx]) };
+ }
+ }
+ FilterIterator::Indices(indices) => {
+ for idx in indices {
+ unsafe { buffer.push_unchecked(values[*idx]) };
+ }
+ }
+ }
+
+ let mut builder = ArrayDataBuilder::new(data.data_type().clone())
+ .len(predicate.count)
+ .add_buffer(buffer.into());
+
+ if let Some((null_count, nulls)) = filter_null_mask(data, predicate) {
+ builder = builder.null_count(null_count).null_bit_buffer(nulls);
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ PrimitiveArray::from(data)
+}
+
+/// [`FilterString`] is created from a source [`GenericStringArray`] and can be
+/// used to build a new [`GenericStringArray`] by copying values from the
source
+///
+/// TODO(raphael): Could this be used for the take kernel as well?
+struct FilterString<'a, OffsetSize> {
Review comment:
In subsequent PRs I intend to experiment with applying a similar
construction to primitive arrays and null bitmasks, e.g. `FilterPrimitive` and
`FilterNull` as I think it might compose nicely and potentially allow for code
sharing with the take kernels.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]