alamb commented on code in PR #8951:
URL: https://github.com/apache/arrow-rs/pull/8951#discussion_r2701054448
##########
arrow-select/src/coalesce/primitive.rs:
##########
@@ -95,6 +96,145 @@ impl<T: ArrowPrimitiveType + Debug> InProgressArray for
InProgressPrimitiveArray
Ok(())
}
+ /// Copy rows using a predicate
+ fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(),
ArrowError> {
+ self.ensure_capacity();
+
+ let s = self
+ .source
+ .as_ref()
+ .ok_or_else(|| {
+ ArrowError::InvalidArgumentError(
+ "Internal Error: InProgressPrimitiveArray: source not
set".to_string(),
+ )
+ })?
+ .as_primitive::<T>();
+
+ let values = s.values();
+ let count = filter.count();
+
+ // Use the predicate's strategy for optimal iteration
+ match filter.strategy() {
+ IterationStrategy::SlicesIterator => {
+ // Copy values, nulls using slices
+ if let Some(nulls) = s.nulls().filter(|n| n.null_count() > 0) {
+ for (start, end) in
SlicesIterator::new(filter.filter_array()) {
+ // SAFETY: slices are derived from filter predicate
+ self.current
+ .extend_from_slice(unsafe {
values.get_unchecked(start..end) });
+ let slice = nulls.slice(start, end - start);
Review Comment:
One thing we could also check is adding a `nulls.append_buffer_sliced` or
something to avoid having to call "slice" on the buffer (and doing an atomic
increment each time)
##########
arrow-select/src/coalesce.rs:
##########
@@ -605,6 +714,15 @@ trait InProgressArray: std::fmt::Debug + Send + Sync {
/// Return an error if the source array is not set
fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(),
ArrowError>;
+ /// Copy rows at the given indices from the current source array into the
in-progress array
+ fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(),
ArrowError> {
+ // Default implementation: iterate over indices from the filter
+ for idx in IndexIterator::new(filter.filter_array(), filter.count()) {
Review Comment:
I found it strange that the default implementation copied with a different
iteration strategy -- Upon review, it seems like this code should never be
called
Therefore I would recommend making the default implementation `panic`
Another potential way to make this clearer would be add a method like
`supports_copy_rows_by_filter` to avoid having to special case "is primitive"
##########
arrow-select/src/filter.rs:
##########
@@ -37,9 +37,9 @@ use arrow_schema::*;
/// [`SlicesIterator`] to copy ranges of values. Otherwise iterate
/// over individual rows using [`IndexIterator`]
///
-/// Threshold of 0.8 chosen based on
<https://dl.acm.org/doi/abs/10.1145/3465998.3466009>
+/// Threshold of 0.9 chosen based on benchmarking results
///
-const FILTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8;
+const FILTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.9;
Review Comment:
This change to the filter heuristic seems like we should pull it out into
its own PR so we can test / make it clear what changed.
##########
arrow-select/src/coalesce.rs:
##########
@@ -571,6 +788,15 @@ trait InProgressArray: std::fmt::Debug + Send + Sync {
/// Return an error if the source array is not set
fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(),
ArrowError>;
+ /// Copy rows at the given indices from the current source array into the
in-progress array
+ fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(),
ArrowError> {
+ // Default implementation: iterate over indices from the filter
Review Comment:
- https://github.com/apache/arrow-rs/issues/9143
##########
arrow-select/src/filter.rs:
##########
@@ -321,7 +341,13 @@ enum IterationStrategy {
impl IterationStrategy {
/// The default [`IterationStrategy`] for a filter of length
`filter_length`
/// and selecting `filter_count` rows
- fn default_strategy(filter_length: usize, filter_count: usize) -> Self {
+ ///
+ /// Returns:
+ /// - [`IterationStrategy::None`] if `filter_count` is 0
+ /// - [`IterationStrategy::All`] if `filter_count == filter_length`
+ /// - [`IterationStrategy::SlicesIterator`] if selectivity > 80%
Review Comment:
I think you changed the threshold to 90%
##########
arrow-select/src/coalesce.rs:
##########
@@ -238,10 +245,112 @@ impl BatchCoalescer {
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError> {
- // TODO: optimize this to avoid materializing (copying the results
- // of filter to a new batch)
- let filtered_batch = filter_record_batch(&batch, filter)?;
- self.push_batch(filtered_batch)
+ // We only support primitve now, fallback to filter_record_batch for
other types
+ // Also, skip optimization when filter is not very selective
+ if batch
+ .schema()
+ .fields()
+ .iter()
+ .any(|field| !field.data_type().is_primitive())
Review Comment:
Yeah I am thinking about that as well
##########
arrow-select/src/coalesce/primitive.rs:
##########
@@ -95,6 +96,145 @@ impl<T: ArrowPrimitiveType + Debug> InProgressArray for
InProgressPrimitiveArray
Ok(())
}
+ /// Copy rows using a predicate
+ fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(),
ArrowError> {
+ self.ensure_capacity();
+
+ let s = self
+ .source
+ .as_ref()
+ .ok_or_else(|| {
+ ArrowError::InvalidArgumentError(
+ "Internal Error: InProgressPrimitiveArray: source not
set".to_string(),
+ )
+ })?
+ .as_primitive::<T>();
+
+ let values = s.values();
+ let count = filter.count();
+
+ // Use the predicate's strategy for optimal iteration
+ match filter.strategy() {
+ IterationStrategy::SlicesIterator => {
+ // Copy values, nulls using slices
+ if let Some(nulls) = s.nulls().filter(|n| n.null_count() > 0) {
Review Comment:
I don't know if it would matter, but one difference with the filter kernel
is that the filter kernel handles values in one loop and then nulls in a second
(`filter_bits`). (as in a single loop that copies the values and then a second
loop/iterator that copies the nulls, if any)
Doing so would keep the inner loop smaller and make it easier to reuse the
null filtering code between kernels
However, this is also something we can do as a follow on PR / refactor when
we add a second array type
##########
arrow-select/src/coalesce.rs:
##########
@@ -238,10 +245,112 @@ impl BatchCoalescer {
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError> {
- // TODO: optimize this to avoid materializing (copying the results
- // of filter to a new batch)
- let filtered_batch = filter_record_batch(&batch, filter)?;
- self.push_batch(filtered_batch)
+ // We only support primitve now, fallback to filter_record_batch for
other types
+ // Also, skip optimization when filter is not very selective
+ if batch
+ .schema()
+ .fields()
+ .iter()
+ .any(|field| !field.data_type().is_primitive())
+ || self
+ .biggest_coalesce_batch_size
+ .map(|biggest_size| filter.true_count() > biggest_size)
+ .unwrap_or(false)
+ {
+ let batch = filter_record_batch(&batch, filter)?;
+
+ self.push_batch(batch)?;
+ return Ok(());
+ }
+
+ // Build an optimized filter predicate that chooses the best iteration
strategy
+ let is_optimize_beneficial =
is_optimize_beneficial_record_batch(&batch);
+ let selected_count = filter.true_count();
+
+ // Fast path: skip if no rows selected
+ if selected_count == 0 {
+ return Ok(());
+ }
+
+ // Fast path: if all rows selected, just push the batch
+ if selected_count == batch.num_rows() {
+ return self.push_batch(batch);
+ }
+
+ let (_schema, arrays, _num_rows) = batch.into_parts();
+
+ // Setup input arrays as sources
+ assert_eq!(arrays.len(), self.in_progress_arrays.len());
+ self.in_progress_arrays
+ .iter_mut()
+ .zip(&arrays)
+ .for_each(|(in_progress, array)| {
+ in_progress.set_source(Some(Arc::clone(array)));
+ });
Review Comment:
I think you can avoid this Arc::clone (probably not a big deal but every
little bit helps)
```suggestion
self.in_progress_arrays
.iter_mut()
.zip(arrays)
.for_each(|(in_progress, array)| {
in_progress.set_source(Some(array));
});
```
##########
arrow-select/src/coalesce.rs:
##########
@@ -237,10 +243,101 @@ impl BatchCoalescer {
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError> {
- // TODO: optimize this to avoid materializing (copying the results
- // of filter to a new batch)
- let filtered_batch = filter_record_batch(&batch, filter)?;
- self.push_batch(filtered_batch)
+ // We only support primitve now, fallback to filter_record_batch for
other types
+ // Also, skip optimization when filter is not very selective
Review Comment:
Yeah, I do think it would be good to take into account
`biggest_coalesce_batch_size` but maybe we can do so as a follow on PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]