alamb commented on code in PR #8103: URL: https://github.com/apache/arrow-rs/pull/8103#discussion_r2266795289
########## arrow-select/src/coalesce.rs: ########## @@ -198,15 +198,139 @@ impl BatchCoalescer { /// let expected_batch = record_batch!(("a", Int32, [1, 3, 4, 6])).unwrap(); /// assert_eq!(completed_batch, expected_batch); /// ``` + /// Zero-copy implementation using `compute_filter_plan`. pub fn push_batch_with_filter( &mut self, batch: RecordBatch, - filter: &BooleanArray, + predicate: &BooleanArray, ) -> Result<(), ArrowError> { - // TODO: optimize this to avoid materializing (copying the results - // of filter to a new batch) - let filtered_batch = filter_record_batch(&batch, filter)?; - self.push_batch(filtered_batch) + // First compute the filter plan based on the predicate + // (calls FilterBuilder::optimize internally) + let plan = compute_filter_plan(predicate); + + match plan { + FilterPlan::None => { + // No rows selected + Ok(()) + } + FilterPlan::All => { + // All rows selected: directly call push_batch (consumes batch) + self.push_batch(batch) + } + FilterPlan::Slices(slices) => { + // Consume the batch and set sources on in_progress arrays + let (_schema, arrays, _nrows) = batch.into_parts(); + assert_eq!(arrays.len(), self.in_progress_arrays.len()); + + self.in_progress_arrays + .iter_mut() + .zip(arrays) + .for_each(|(in_progress, array)| { + in_progress.set_source(Some(array)); + }); + + // For each contiguous slice, copy rows in chunks fitting target_batch_size + for (mut start, end) in slices { Review Comment: I suspect to really make this fast it will need to have specialized implementations for the different array types (not use mutable array data) I think we could yoink / reuse some of the existing code from the filter kernel: https://github.com/apache/arrow-rs/blob/04f217b6708eed2804c3b0a669a65ea111c2c5f1/arrow-select/src/filter.rs#L322 ########## arrow-select/src/filter.rs: ########## @@ -164,6 +164,44 @@ fn multiple_arrays(data_type: &DataType) -> bool { } } +/// A public, lightweight plan describing how to apply a Boolean filter. +/// +/// Used for zero-copy filtering externally (e.g., in BatchCoalescer): +/// - `None`: no rows selected +/// - `All`: all rows selected +/// - `Slices`: list of continuous ranges `[start, end)` (can be used directly for `copy_rows`) +/// - `Indices`: list of single-row indices (can be merged into continuous ranges externally) +#[derive(Debug, Clone)] +pub enum FilterPlan { + None, + All, + Slices(Vec<(usize, usize)>), + Indices(Vec<usize>), +} + +/// Compute a filtering plan based on `FilterBuilder::optimize`. +/// +/// This function calls `FilterBuilder::new(filter).optimize()`, then +/// converts the optimized `IterationStrategy` into the above `FilterPlan` +/// to enable zero-copy execution externally. +pub fn compute_filter_plan(filter: &BooleanArray) -> FilterPlan { + let fb = FilterBuilder::new(filter); + let pred = fb.build(); + + match pred.strategy { + IterationStrategy::None => FilterPlan::None, + IterationStrategy::All => FilterPlan::All, + IterationStrategy::Slices(s) => FilterPlan::Slices(s), // moved directly + IterationStrategy::Indices(i) => FilterPlan::Indices(i), // moved directly + IterationStrategy::SlicesIterator => { + FilterPlan::Slices(SlicesIterator::new(&pred.filter).collect()) Review Comment: avoiding this allocation will likely help -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org