Dandandan commented on code in PR #16208: URL: https://github.com/apache/datafusion/pull/16208#discussion_r2115400750
########## datafusion/physical-plan/src/filter.rs: ########## @@ -689,6 +702,46 @@ fn filter_and_project( }) } +impl FilterExecStream { + /// Evaluates the predicate filter on the given batch and appends and rows that match + /// to the in progress output batch builder. + fn filter_batch(&mut self, batch: RecordBatch) -> Result<()> { + self.predicate + .evaluate(&batch) + .and_then(|v| v.into_array(batch.num_rows())) + .and_then(|filter| { + let Some(filter) = filter.as_boolean_opt() else { + return internal_err!( + "Cannot create filter_array from non-boolean predicates" + ); + }; + + let batch = match self.projection.as_ref() { + Some(projection) => { + let projected_columns = projection + .iter() + .map(|i| Arc::clone(batch.column(*i))) + .collect(); + // Safety -- the input was a valid RecordBatch and thus the projection is too + unsafe { + RecordBatch::new_unchecked( + Arc::clone(&self.schema), + projected_columns, + batch.num_rows(), + ) + } + } + None => batch, + }; + let output_batch_builder = self + .output_batch_builder + .as_mut() + .expect("output_batch_builder should be Some"); + Ok(output_batch_builder.append_filtered(batch, filter)?) Review Comment: My feeling is this should to do this "multi batch filter" and then `concat` anyway if smaller batches are generated by this approach, rather than using a builder approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org