alamb commented on code in PR #16208:
URL: https://github.com/apache/datafusion/pull/16208#discussion_r2115668031


##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -689,6 +702,46 @@ fn filter_and_project(
         })
 }
 
+impl FilterExecStream {
+    /// Evaluates the predicate filter on the given batch and appends and rows 
that match
+    /// to the in progress output batch builder.
+    fn filter_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        self.predicate
+            .evaluate(&batch)
+            .and_then(|v| v.into_array(batch.num_rows()))
+            .and_then(|filter| {
+                let Some(filter) = filter.as_boolean_opt() else {
+                    return internal_err!(
+                        "Cannot create filter_array from non-boolean 
predicates"
+                    );
+                };
+
+                let batch = match self.projection.as_ref() {
+                    Some(projection) => {
+                        let projected_columns = projection
+                            .iter()
+                            .map(|i| Arc::clone(batch.column(*i)))
+                            .collect();
+                        // Safety -- the input was a valid RecordBatch and 
thus the projection is too
+                        unsafe {
+                            RecordBatch::new_unchecked(
+                                Arc::clone(&self.schema),
+                                projected_columns,
+                                batch.num_rows(),
+                            )
+                        }
+                    }
+                    None => batch,
+                };
+                let output_batch_builder = self
+                    .output_batch_builder
+                    .as_mut()
+                    .expect("output_batch_builder should be Some");
+                Ok(output_batch_builder.append_filtered(batch, filter)?)

Review Comment:
   > This avoids reallocations / copying as the target capacity can be 
calculated.
   In order to avoid buffering too much batches probably have to limit this / 
create a batch anyway after x batches or having x megabytes in memory.
   
   I was trying to avoid having any reallocations  in  the 
[`IncrementalRecordBatchBuilder`](https://github.com/apache/arrow-rs/pull/7513/files#diff-52e0762696101af1d088555b3d8bbe861f53dc4e9e6a6edd37eee22f950ec742R48)
 -- since we know the target output batch size (`batch_size`) it knows how much 
space each batch will take up front and can just straight up allocate it ( 
`instantiate_builder`  function creates the builders with `with_capacity`)
   
   However, now that I think about it, after a call to `finish()` the updated 
builder doesn't have the right allocation 🤔 
   
   I'll look into that more later today
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to