rluvaton commented on code in PR #8951:
URL: https://github.com/apache/arrow-rs/pull/8951#discussion_r2702349885
##########
arrow-select/src/coalesce.rs:
##########
@@ -238,10 +242,100 @@ impl BatchCoalescer {
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError> {
- // TODO: optimize this to avoid materializing (copying the results
- // of filter to a new batch)
- let filtered_batch = filter_record_batch(&batch, filter)?;
- self.push_batch(filtered_batch)
+ // We only support primitve now, fallback to filter_record_batch for
other types
+ // Also, skip optimization when filter is not very selectivex§
+
+ // Build an optimized filter predicate that chooses the best iteration
strategy
+ let is_optimize_beneficial =
is_optimize_beneficial_record_batch(&batch);
+ let selected_count = filter.true_count();
+ let num_rows = batch.num_rows();
+
+ // Fast path: skip if no rows selected
+ if selected_count == 0 {
+ return Ok(());
+ }
+
+ // Fast path: if all rows selected, just push the batch
+ if selected_count == num_rows {
+ return self.push_batch(batch);
+ }
+
+ let selectivity = Some(selected_count as f64 / num_rows as f64);
+ let (_schema, arrays, _num_rows) = batch.into_parts();
+
+ // Setup input arrays as sources
+ assert_eq!(arrays.len(), self.in_progress_arrays.len());
+ self.in_progress_arrays
+ .iter_mut()
+ .zip(&arrays)
+ .for_each(|(in_progress, array)| {
+ in_progress.set_source(Some(Arc::clone(array)), selectivity);
+ });
+
+ // Choose iteration strategy based on the optimized predicate
+ self.copy_from_filter(filter, is_optimize_beneficial, selected_count)?;
+
+ // Clear sources to allow memory to be freed
+ for in_progress in self.in_progress_arrays.iter_mut() {
+ in_progress.set_source(None, None);
+ }
+
+ Ok(())
+ }
+
+ /// Helper to copy rows at the given indices, handling batch boundaries
efficiently
+ ///
+ /// This method batches the index iteration to avoid per-row batch
boundary checks.
+ fn copy_from_filter(
+ &mut self,
+ filter: &BooleanArray,
+ is_optimize_beneficial: bool,
+ count: usize,
+ ) -> Result<(), ArrowError> {
+ let mut remaining = count;
+ let mut filter_pos = 0; // Position in the filter array
+
+ // Build an optimized filter predicate once for the whole input batch
+ let mut filter_builder = FilterBuilder::new(filter);
+ if is_optimize_beneficial {
+ filter_builder = filter_builder.optimize();
+ }
+ let predicate = filter_builder.build();
+
+ // We need to process the filter in chunks that fit the target batch
size
+ while remaining > 0 {
+ let space_in_batch = self.target_batch_size - self.buffered_rows;
+ let to_copy = remaining.min(space_in_batch);
+
+ // Find how many filter positions we need to cover `to_copy` set
bits
+ // Skip the expensive search if all remaining rows fit in the
current batch
+ let chunk_len = if remaining <= space_in_batch {
+ filter.len() - filter_pos
+ } else {
+ filter
+ .values()
+ .find_nth_set_bit_position(filter_pos, to_copy)
+ - filter_pos
+ };
+
+ let chunk_predicate = predicate.slice(filter_pos, chunk_len,
to_copy);
+
+ // Copy all collected indices in one call per array
+ for in_progress in self.in_progress_arrays.iter_mut() {
+ in_progress.copy_rows_by_filter(&chunk_predicate, filter_pos,
chunk_len)?;
+ }
Review Comment:
A performance improvement you can do here is copy X columns at a time like I
did and explained in 4:
- https://github.com/apache/arrow-rs/issues/9083#issuecomment-3756262948
the number 4 is a magic number, but you can pick other number like 2 to
amortize the cost of boolean iterations
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]