alamb commented on code in PR #9755:
URL: https://github.com/apache/arrow-rs/pull/9755#discussion_r3342321491
##########
arrow-select/src/coalesce/byte_view.rs:
##########
@@ -346,6 +420,34 @@ impl<B: ByteViewType> InProgressArray for
InProgressByteViewArray<B> {
Ok(())
}
+ fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(),
ArrowError> {
+ self.ensure_capacity();
+ let source = self.source.take().ok_or_else(|| {
+ ArrowError::InvalidArgumentError(
+ "Internal Error: InProgressByteViewArray: source not
set".to_string(),
+ )
+ })?;
+
+ let s = source.array.as_byte_view::<B>();
+
+ if !s.data_buffers().is_empty() {
+ // The fused coalescer only selects this implementation for
all-inline
+ // views. Keep this fallback local so the trait method remains
correct
Review Comment:
Rather than dead code that can not be called, can we please guard against
regressions by:
1. Test coverage
2. Returning an internal error
I think this is more defensive than trying to leave code to cover
unreachable paths -- the problem with leaving a fallback that can't be tested
is is suceptable to bitrot and so if it ever does get hit, it may be silently
incorrect
##########
arrow-select/src/coalesce/primitive.rs:
##########
@@ -94,6 +135,43 @@ impl<T: ArrowPrimitiveType + Debug> InProgressArray for
InProgressPrimitiveArray
Ok(())
}
+ fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(),
ArrowError> {
+ match RowSelection::from_filter(filter) {
+ RowSelection::Indices(indices) => {
+ let s = self
+ .source
+ .as_ref()
+ .ok_or_else(|| {
+ ArrowError::InvalidArgumentError(
+ "Internal Error: InProgressPrimitiveArray: source
not set".to_string(),
+ )
+ })?
+ .as_primitive::<T>();
+
+ append_filtered_nulls(&mut self.nulls, s.nulls(), filter);
+ self.append_values_by_indices(indices.iter().copied())
+ }
+ RowSelection::IndexIterator {
+ filter: filter_array,
+ count,
+ } => {
+ let s = self
Review Comment:
the repetition is unfortunate -- I wonder if we could make a method like
`self.primitive_source<T>()` could avoid it 🤔
##########
arrow/benches/coalesce_kernels.rs:
##########
@@ -51,13 +51,27 @@ fn add_all_filter_benchmarks(c: &mut Criterion) {
true,
)]));
+ // Single BinaryViewArray
Review Comment:
If we put the changes in this benchmark into a new PR I could run the
automated tests to see how it improves.
##########
arrow-select/src/coalesce.rs:
##########
@@ -566,6 +589,189 @@ impl BatchCoalescer {
}
}
+#[inline]
+fn find_inline_view_columns(schema: &SchemaRef) -> Option<InlineViewColumns> {
+ let mut utf8_view_indices = Vec::new();
+ let mut binary_view_indices = Vec::new();
+
+ for (index, field) in schema.fields().iter().enumerate() {
+ if field.data_type().is_primitive() {
+ continue;
+ }
+
+ match field.data_type() {
+ DataType::Utf8View => utf8_view_indices.push(index),
+ DataType::BinaryView => binary_view_indices.push(index),
+ // Keep the fused path narrow until other non-primitive array types
+ // can consume the shared row selection without first materializing
+ // a filtered array.
+ _ => return None,
+ }
+ }
+
+ (!utf8_view_indices.is_empty() ||
!binary_view_indices.is_empty()).then_some(
+ InlineViewColumns {
+ utf8_view_indices,
+ binary_view_indices,
+ },
+ )
+}
+
+impl InlineViewColumns {
+ #[inline]
+ fn are_inline_in(&self, batch: &RecordBatch) -> bool {
+ // Only candidate schemas pay the per-batch check that all supported
+ // view values are inline and eligible for the fused direct-copy path.
+ for &index in &self.utf8_view_indices {
+ let is_inline = batch
+ .columns()
+ .get(index)
+ .and_then(|array| array.as_string_view_opt())
+ .is_some_and(|view| view.data_buffers().is_empty());
+
+ if !is_inline {
+ return false;
+ }
+ }
+
+ for &index in &self.binary_view_indices {
+ let is_inline = batch
+ .columns()
+ .get(index)
+ .and_then(|array| array.as_binary_view_opt())
+ .is_some_and(|view| view.data_buffers().is_empty());
+
+ if !is_inline {
+ return false;
+ }
+ }
+
+ true
+ }
+}
+
+impl BatchCoalescer {
+ fn push_batch_with_fused_inline_filter(
+ &mut self,
+ batch: RecordBatch,
+ filter: &BooleanArray,
+ ) -> Result<(), ArrowError> {
+ if filter.len() > batch.num_rows() {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Filter predicate of length {} is larger than target array of
length {}",
+ filter.len(),
+ batch.num_rows()
+ )));
+ }
+
+ let mut filter_builder = FilterBuilder::new(filter);
+ if batch.num_columns() > 1 {
+ filter_builder = filter_builder.optimize();
+ }
+ let predicate = filter_builder.build();
+ let selected_count = predicate.count();
+
+ if selected_count == 0 {
+ return Ok(());
+ }
+
+ if selected_count == batch.num_rows() && filter.len() ==
batch.num_rows() {
+ return self.push_batch(batch);
+ }
+
+ let exceeds_coalesce_limit = self
+ .biggest_coalesce_batch_size
+ .is_some_and(|limit| selected_count > limit);
+ // The generic filter kernel is faster for dense filters. The 4x check
+ // keeps this fused path to filters selecting at most 25% of rows.
+ let is_dense_filter = selected_count.saturating_mul(4) > filter.len();
+ let does_not_fit_buffer = selected_count > self.target_batch_size -
self.buffered_rows;
+
+ if exceeds_coalesce_limit || is_dense_filter || does_not_fit_buffer {
+ let filtered_batch = predicate.filter_record_batch(&batch)?;
+ return self.push_batch(filtered_batch);
+ }
+
+ let all_views_are_inline = self
+ .inline_view_columns
+ .as_ref()
+ .is_some_and(|columns| columns.are_inline_in(&batch));
+
+ if !all_views_are_inline {
+ let filtered_batch = predicate.filter_record_batch(&batch)?;
+ return self.push_batch(filtered_batch);
+ }
+
+ let (_schema, arrays, _num_rows) = batch.into_parts();
+
+ if arrays.len() != self.in_progress_arrays.len() {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Batch has {} columns but BatchCoalescer expects {}",
+ arrays.len(),
+ self.in_progress_arrays.len()
+ )));
+ }
+
+ self.in_progress_arrays
+ .iter_mut()
+ .zip(arrays)
+ .for_each(|(in_progress, array)| {
+ in_progress.set_source(Some(array));
+ });
+
+ let result = (|| {
+ for in_progress in self.in_progress_arrays.iter_mut() {
+ in_progress.copy_rows_by_filter(&predicate)?;
+ }
+
+ self.buffered_rows += selected_count;
+ if self.buffered_rows >= self.target_batch_size {
+ self.finish_buffered_batch()?;
+ }
+
+ Ok(())
+ })();
+
+ for in_progress in self.in_progress_arrays.iter_mut() {
+ in_progress.set_source(None);
+ }
+
+ result
+ }
+}
+
+pub(crate) enum RowSelection<'a> {
Review Comment:
Maybe we could also call it FilterSelection rather than RowSelection to
avoid name clash with the RowSelection in Parquet 🤔
##########
arrow-select/src/coalesce.rs:
##########
@@ -566,6 +589,189 @@ impl BatchCoalescer {
}
}
+#[inline]
+fn find_inline_view_columns(schema: &SchemaRef) -> Option<InlineViewColumns> {
+ let mut utf8_view_indices = Vec::new();
+ let mut binary_view_indices = Vec::new();
+
+ for (index, field) in schema.fields().iter().enumerate() {
+ if field.data_type().is_primitive() {
+ continue;
+ }
+
+ match field.data_type() {
+ DataType::Utf8View => utf8_view_indices.push(index),
+ DataType::BinaryView => binary_view_indices.push(index),
+ // Keep the fused path narrow until other non-primitive array types
+ // can consume the shared row selection without first materializing
+ // a filtered array.
+ _ => return None,
+ }
+ }
+
+ (!utf8_view_indices.is_empty() ||
!binary_view_indices.is_empty()).then_some(
+ InlineViewColumns {
+ utf8_view_indices,
+ binary_view_indices,
+ },
+ )
+}
+
+impl InlineViewColumns {
+ #[inline]
+ fn are_inline_in(&self, batch: &RecordBatch) -> bool {
+ // Only candidate schemas pay the per-batch check that all supported
+ // view values are inline and eligible for the fused direct-copy path.
+ for &index in &self.utf8_view_indices {
+ let is_inline = batch
+ .columns()
+ .get(index)
+ .and_then(|array| array.as_string_view_opt())
+ .is_some_and(|view| view.data_buffers().is_empty());
+
+ if !is_inline {
+ return false;
+ }
+ }
+
+ for &index in &self.binary_view_indices {
+ let is_inline = batch
+ .columns()
+ .get(index)
+ .and_then(|array| array.as_binary_view_opt())
+ .is_some_and(|view| view.data_buffers().is_empty());
+
+ if !is_inline {
+ return false;
+ }
+ }
+
+ true
+ }
+}
+
+impl BatchCoalescer {
+ fn push_batch_with_fused_inline_filter(
+ &mut self,
+ batch: RecordBatch,
+ filter: &BooleanArray,
+ ) -> Result<(), ArrowError> {
+ if filter.len() > batch.num_rows() {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Filter predicate of length {} is larger than target array of
length {}",
+ filter.len(),
+ batch.num_rows()
+ )));
+ }
+
+ let mut filter_builder = FilterBuilder::new(filter);
+ if batch.num_columns() > 1 {
+ filter_builder = filter_builder.optimize();
+ }
+ let predicate = filter_builder.build();
+ let selected_count = predicate.count();
+
+ if selected_count == 0 {
+ return Ok(());
+ }
+
+ if selected_count == batch.num_rows() && filter.len() ==
batch.num_rows() {
+ return self.push_batch(batch);
+ }
+
+ let exceeds_coalesce_limit = self
+ .biggest_coalesce_batch_size
+ .is_some_and(|limit| selected_count > limit);
+ // The generic filter kernel is faster for dense filters. The 4x check
+ // keeps this fused path to filters selecting at most 25% of rows.
+ let is_dense_filter = selected_count.saturating_mul(4) > filter.len();
+ let does_not_fit_buffer = selected_count > self.target_batch_size -
self.buffered_rows;
+
+ if exceeds_coalesce_limit || is_dense_filter || does_not_fit_buffer {
+ let filtered_batch = predicate.filter_record_batch(&batch)?;
+ return self.push_batch(filtered_batch);
+ }
+
+ let all_views_are_inline = self
+ .inline_view_columns
+ .as_ref()
+ .is_some_and(|columns| columns.are_inline_in(&batch));
+
+ if !all_views_are_inline {
+ let filtered_batch = predicate.filter_record_batch(&batch)?;
+ return self.push_batch(filtered_batch);
+ }
+
+ let (_schema, arrays, _num_rows) = batch.into_parts();
+
+ if arrays.len() != self.in_progress_arrays.len() {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Batch has {} columns but BatchCoalescer expects {}",
+ arrays.len(),
+ self.in_progress_arrays.len()
+ )));
+ }
+
+ self.in_progress_arrays
+ .iter_mut()
+ .zip(arrays)
+ .for_each(|(in_progress, array)| {
+ in_progress.set_source(Some(array));
+ });
+
+ let result = (|| {
+ for in_progress in self.in_progress_arrays.iter_mut() {
+ in_progress.copy_rows_by_filter(&predicate)?;
+ }
+
+ self.buffered_rows += selected_count;
+ if self.buffered_rows >= self.target_batch_size {
+ self.finish_buffered_batch()?;
+ }
+
+ Ok(())
+ })();
+
+ for in_progress in self.in_progress_arrays.iter_mut() {
+ in_progress.set_source(None);
+ }
+
+ result
+ }
+}
+
+pub(crate) enum RowSelection<'a> {
Review Comment:
Could we add comments that explain what this is doing? I think the idea is
that it allows reusing a IterationStrategy without rematerializing, which is
quite clever
https://github.com/apache/arrow-rs/blob/c736cc511c8778e858747ba6139116b5ebac22bd/arrow-select/src/filter.rs#L330
What do you think about adding it as a normal API directly to
FilterPredicate?
1. Move RowSelection into filter.rs
```rust
// arrow-select/src/filter.rs
/// A borrowed description of which rows a [`FilterPredicate`] selects.
///
/// This is the borrowed projection of the predicate's internal
/// [`IterationStrategy`]: the owned `Slices`/`Indices` are borrowed
directly,
/// and the lazy iterator strategies carry the filter array + count their
/// iterators need.
pub(crate) enum RowSelection<'a> {
None,
All { len: usize },
Slices(&'a [(usize, usize)]),
SlicesIterator(&'a BooleanArray),
Indices(&'a [usize]),
IndexIterator { filter: &'a BooleanArray, count: usize },
}
```
2. Add the method on FilterPredicate
```rust
// arrow-select/src/filter.rs (impl FilterPredicate, near count())
impl FilterPredicate {
/// Returns a borrowed [`RowSelection`] describing the rows this
predicate selects.
pub(crate) fn selection(&self) -> RowSelection<'_> {
match &self.strategy {
IterationStrategy::None => RowSelection::None,
IterationStrategy::All => RowSelection::All { len: self.count
},
IterationStrategy::Slices(slices) =>
RowSelection::Slices(slices),
IterationStrategy::SlicesIterator =>
RowSelection::SlicesIterator(&self.filter),
IterationStrategy::Indices(indices) =>
RowSelection::Indices(indices),
IterationStrategy::IndexIterator =>
RowSelection::IndexIterator {
filter: &self.filter,
count: self.count,
},
}
}
}
```
##########
arrow-select/src/coalesce.rs:
##########
@@ -145,6 +149,19 @@ pub struct BatchCoalescer {
completed: VecDeque<RecordBatch>,
/// Biggest coalesce batch size. See
[`Self::with_biggest_coalesce_batch_size`]
biggest_coalesce_batch_size: Option<usize>,
+ /// Inline view columns eligible for the fused filter path.
+ inline_view_columns: Option<InlineViewColumns>,
+}
+
+/// Inline Utf8View/BinaryView columns in a schema.
+///
+/// These columns can copy filtered views directly into the coalescer without
Review Comment:
I don't fully follow this logic or why the BatchCoalescer needs to be
tracking per-type state when the rest of the code has per-type state in
`InProgressArray` 🤔
What do you think about modeling this as a separate type of
`InProgressArray` -- for example instead of making some different paths for
`InProgressByteViewArray` we could instead make `InProgressInlineByteViewArray`
-- and then have a function that converts from `InProgressInlneByteViewArray`
to `InProgressByteViewArray` if the input has any non inlined views
I think that would make it harder to misuse the APIs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]