tustvold commented on a change in pull request #1248:
URL: https://github.com/apache/arrow-rs/pull/1248#discussion_r801080970
##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -185,79 +287,600 @@ pub fn prep_null_mask_filter(filter: &BooleanArray) ->
BooleanArray {
/// # Ok(())
/// # }
/// ```
-pub fn filter(array: &dyn Array, predicate: &BooleanArray) -> Result<ArrayRef>
{
- if predicate.null_count() > 0 {
- // this greatly simplifies subsequent filtering code
- // now we only have a boolean mask to deal with
- let predicate = prep_null_mask_filter(predicate);
- return filter(array, &predicate);
+pub fn filter(values: &dyn Array, predicate: &BooleanArray) ->
Result<ArrayRef> {
+ let predicate = FilterBuilder::new(predicate).build();
+ filter_array(values, &predicate)
+}
+
+/// Returns a new [RecordBatch] with arrays containing only values matching
the filter.
+pub fn filter_record_batch(
+ record_batch: &RecordBatch,
+ predicate: &BooleanArray,
+) -> Result<RecordBatch> {
+ let mut filter_builder = FilterBuilder::new(predicate);
+ if record_batch.num_columns() > 1 {
+ // Only optimize if filtering more than one column
+ filter_builder = filter_builder.optimize();
+ }
+ let filter = filter_builder.build();
+
+ let filtered_arrays = record_batch
+ .columns()
+ .iter()
+ .map(|a| filter_array(a, &filter))
+ .collect::<Result<Vec<_>>>()?;
+
+ RecordBatch::try_new(record_batch.schema(), filtered_arrays)
+}
+
+/// A builder to construct [`FilterPredicate`]
+#[derive(Debug)]
+pub struct FilterBuilder {
+ filter: BooleanArray,
+ count: usize,
+ strategy: IterationStrategy,
+}
+
+impl FilterBuilder {
+ /// Create a new [`FilterBuilder`] that can be used to construct a
[`FilterPredicate`]
+ pub fn new(filter: &BooleanArray) -> Self {
+ let filter = match filter.null_count() {
+ 0 => BooleanArray::from(filter.data().clone()),
+ _ => prep_null_mask_filter(filter),
+ };
+
+ let count = filter_count(&filter);
+ let strategy = IterationStrategy::default_strategy(filter.len(),
count);
+
+ Self {
+ filter,
+ count,
+ strategy,
+ }
}
- let filter_count = filter_count(predicate);
+ /// Compute an optimised representation of the provided `filter` mask that
can be
+ /// applied to an array more quickly.
+ ///
+ /// Note: There is limited benefit to calling this to then filter a single
array
+ /// Note: This will likely have a larger memory footprint than the
original mask
+ pub fn optimize(mut self) -> Self {
+ match self.strategy {
+ IterationStrategy::SlicesIterator => {
+ let slices = SlicesIterator::new(&self.filter).collect();
+ self.strategy = IterationStrategy::Slices(slices)
+ }
+ IterationStrategy::IndexIterator => {
+ let indices = IndexIterator::new(&self.filter,
self.count).collect();
+ self.strategy = IterationStrategy::Indices(indices)
+ }
+ _ => {}
+ }
+ self
+ }
- match filter_count {
- 0 => {
- // return empty
- Ok(new_empty_array(array.data_type()))
+ /// Construct the final `FilterPredicate`
+ pub fn build(self) -> FilterPredicate {
+ FilterPredicate {
+ filter: self.filter,
+ count: self.count,
+ strategy: self.strategy,
}
- len if len == array.len() => {
- // return all
- let data = array.data().clone();
- Ok(make_array(data))
+ }
+}
+
+/// The iteration strategy used to evaluate [`FilterPredicate`]
+#[derive(Debug)]
+enum IterationStrategy {
+ /// A lazily evaluated iterator of ranges
+ SlicesIterator,
+ /// A lazily evaluated iterator of indices
+ IndexIterator,
+ /// A precomputed list of indices
+ Indices(Vec<usize>),
+ /// A precomputed array of ranges
+ Slices(Vec<(usize, usize)>),
+ /// Select all rows
+ All,
+ /// Select no rows
+ None,
+}
+
+impl IterationStrategy {
+ /// The default [`IterationStrategy`] for a filter of length
`filter_length`
+ /// and selecting `filter_count` rows
+ fn default_strategy(filter_length: usize, filter_count: usize) -> Self {
+ if filter_length == 0 || filter_count == 0 {
+ return IterationStrategy::None;
}
- _ => {
- // actually filter
- let mut mutable =
- MutableArrayData::new(vec![array.data_ref()], false,
filter_count);
- let iter = SlicesIterator::new(predicate);
- iter.for_each(|(start, end)| mutable.extend(0, start, end));
+ if filter_count == filter_length {
+ return IterationStrategy::All;
+ }
- let data = mutable.freeze();
- Ok(make_array(data))
+ // Compute the selectivity of the predicate by dividing the number of
true
+ // bits in the predicate by the predicate's total length
+ //
+ // This can then be used as a heuristic for the optimal iteration
strategy
+ let selectivity_frac = filter_count as f64 / filter_length as f64;
+ if selectivity_frac > FILTER_SLICES_SELECTIVITY_THRESHOLD {
+ return IterationStrategy::SlicesIterator;
}
+ IterationStrategy::IndexIterator
}
}
-/// Returns a new [RecordBatch] with arrays containing only values matching
the filter.
-pub fn filter_record_batch(
- record_batch: &RecordBatch,
- predicate: &BooleanArray,
-) -> Result<RecordBatch> {
- if predicate.null_count() > 0 {
- // this greatly simplifies subsequent filtering code
- // now we only have a boolean mask to deal with
- let predicate = prep_null_mask_filter(predicate);
- return filter_record_batch(record_batch, &predicate);
+/// A filtering predicate that can be applied to an [`Array`]
+#[derive(Debug)]
+pub struct FilterPredicate {
+ filter: BooleanArray,
+ count: usize,
+ strategy: IterationStrategy,
+}
+
+impl FilterPredicate {
+ /// Selects rows from `values` based on this [`FilterPredicate`]
+ pub fn filter(&self, values: &dyn Array) -> Result<ArrayRef> {
+ filter_array(values, self)
}
+}
- let num_columns = record_batch.columns().len();
+fn filter_array(values: &dyn Array, predicate: &FilterPredicate) ->
Result<ArrayRef> {
+ if predicate.filter.len() > values.len() {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Filter predicate of length {} is larger than target array of
length {}",
+ predicate.filter.len(),
+ values.len()
+ )));
+ }
- let filtered_arrays = match num_columns {
- 1 => {
- vec![filter(record_batch.columns()[0].as_ref(), predicate)?]
+ match predicate.strategy {
+ IterationStrategy::None => Ok(new_empty_array(values.data_type())),
+ IterationStrategy::All => Ok(make_array(values.data().slice(0,
predicate.count))),
+ // actually filter
+ _ => match values.data_type() {
+ DataType::Boolean => {
+ let values =
values.as_any().downcast_ref::<BooleanArray>().unwrap();
+ Ok(Arc::new(filter_boolean(values, predicate)))
+ }
+ DataType::Int8 => {
+ downcast_filter!(Int8Type, values, predicate)
+ }
+ DataType::Int16 => {
+ downcast_filter!(Int16Type, values, predicate)
+ }
+ DataType::Int32 => {
+ downcast_filter!(Int32Type, values, predicate)
+ }
+ DataType::Int64 => {
+ downcast_filter!(Int64Type, values, predicate)
+ }
+ DataType::UInt8 => {
+ downcast_filter!(UInt8Type, values, predicate)
+ }
+ DataType::UInt16 => {
+ downcast_filter!(UInt16Type, values, predicate)
+ }
+ DataType::UInt32 => {
+ downcast_filter!(UInt32Type, values, predicate)
+ }
+ DataType::UInt64 => {
+ downcast_filter!(UInt64Type, values, predicate)
+ }
+ DataType::Float32 => {
+ downcast_filter!(Float32Type, values, predicate)
+ }
+ DataType::Float64 => {
+ downcast_filter!(Float64Type, values, predicate)
+ }
+ DataType::Date32 => {
+ downcast_filter!(Date32Type, values, predicate)
+ }
+ DataType::Date64 => {
+ downcast_filter!(Date64Type, values, predicate)
+ }
+ DataType::Time32(Second) => {
+ downcast_filter!(Time32SecondType, values, predicate)
+ }
+ DataType::Time32(Millisecond) => {
+ downcast_filter!(Time32MillisecondType, values, predicate)
+ }
+ DataType::Time64(Microsecond) => {
+ downcast_filter!(Time64MicrosecondType, values, predicate)
+ }
+ DataType::Time64(Nanosecond) => {
+ downcast_filter!(Time64NanosecondType, values, predicate)
+ }
+ DataType::Timestamp(Second, _) => {
+ downcast_filter!(TimestampSecondType, values, predicate)
+ }
+ DataType::Timestamp(Millisecond, _) => {
+ downcast_filter!(TimestampMillisecondType, values, predicate)
+ }
+ DataType::Timestamp(Microsecond, _) => {
+ downcast_filter!(TimestampMicrosecondType, values, predicate)
+ }
+ DataType::Timestamp(Nanosecond, _) => {
+ downcast_filter!(TimestampNanosecondType, values, predicate)
+ }
+ DataType::Interval(IntervalUnit::YearMonth) => {
+ downcast_filter!(IntervalYearMonthType, values, predicate)
+ }
+ DataType::Interval(IntervalUnit::DayTime) => {
+ downcast_filter!(IntervalDayTimeType, values, predicate)
+ }
+ DataType::Interval(IntervalUnit::MonthDayNano) => {
+ downcast_filter!(IntervalMonthDayNanoType, values, predicate)
+ }
+ DataType::Duration(TimeUnit::Second) => {
+ downcast_filter!(DurationSecondType, values, predicate)
+ }
+ DataType::Duration(TimeUnit::Millisecond) => {
+ downcast_filter!(DurationMillisecondType, values, predicate)
+ }
+ DataType::Duration(TimeUnit::Microsecond) => {
+ downcast_filter!(DurationMicrosecondType, values, predicate)
+ }
+ DataType::Duration(TimeUnit::Nanosecond) => {
+ downcast_filter!(DurationNanosecondType, values, predicate)
+ }
+ DataType::Utf8 => {
+ let values = values
+ .as_any()
+ .downcast_ref::<GenericStringArray<i32>>()
+ .unwrap();
+ Ok(Arc::new(filter_string::<i32>(values, predicate)))
+ }
+ DataType::LargeUtf8 => {
+ let values = values
+ .as_any()
+ .downcast_ref::<GenericStringArray<i64>>()
+ .unwrap();
+ Ok(Arc::new(filter_string::<i64>(values, predicate)))
+ }
+ DataType::Dictionary(key_type, _) => match key_type.as_ref() {
+ DataType::Int8 => downcast_dict_filter!(Int8Type, values,
predicate),
+ DataType::Int16 => downcast_dict_filter!(Int16Type, values,
predicate),
+ DataType::Int32 => downcast_dict_filter!(Int32Type, values,
predicate),
+ DataType::Int64 => downcast_dict_filter!(Int64Type, values,
predicate),
+ DataType::UInt8 => downcast_dict_filter!(UInt8Type, values,
predicate),
+ DataType::UInt16 => downcast_dict_filter!(UInt16Type, values,
predicate),
+ DataType::UInt32 => downcast_dict_filter!(UInt32Type, values,
predicate),
+ DataType::UInt64 => downcast_dict_filter!(UInt64Type, values,
predicate),
+ t => {
+ unimplemented!("Filter not supported for dictionary key
type {:?}", t)
+ }
+ },
+ _ => {
+ // fallback to using MutableArrayData
+ let mut mutable = MutableArrayData::new(
+ vec![values.data_ref()],
+ false,
+ predicate.count,
+ );
+
+ match &predicate.strategy {
+ IterationStrategy::Slices(slices) => {
+ slices
+ .iter()
+ .for_each(|(start, end)| mutable.extend(0, *start,
*end));
+ }
+ _ => {
+ let iter = SlicesIterator::new(&predicate.filter);
+ iter.for_each(|(start, end)| mutable.extend(0, start,
end));
Review comment:
In short because `MutableArrayData` doesn't support other iteration
strategies and I'm not sure that calling it with "ranges" of length 1 will
outperform at least attempting to call it with larger ranges.
My 2 cents is that the eventual goal should be to remove this fallback, as
`MutableArrayData` is a poor fit for filtering where the ranges are typically
not substantial enough to amortize its per-range overheads effectively
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]