alamb commented on code in PR #18604:
URL: https://github.com/apache/datafusion/pull/18604#discussion_r2513830258


##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -696,26 +710,58 @@ impl Stream for FilterExecStream {
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
         let poll;
+        let elapsed_compute = 
self.metrics.baseline_metrics.elapsed_compute().clone();
         loop {
             match ready!(self.input.poll_next_unpin(cx)) {
                 Some(Ok(batch)) => {
-                    let timer = 
self.metrics.baseline_metrics.elapsed_compute().timer();
-                    let filtered_batch = filter_and_project(
-                        &batch,
-                        &self.predicate,
-                        self.projection.as_ref(),
-                        &self.schema,
-                    )?;
-                    timer.done();
+                    let timer = elapsed_compute.timer();
+                    self.predicate.as_ref()
+                        .evaluate(&batch)
+                        .and_then(|v| v.into_array(batch.num_rows()))
+                        .and_then(|array| {
+                            Ok(match self.projection {
+                                Some(ref projection) => {
+                                    let projected_batch = 
batch.project(projection)?;
+                                    (array, projected_batch)
+                                },
+                                None => (array, batch)
+                            })
+                        }).and_then(|(array, batch)| {
+                            match as_boolean_array(&array) {
+                                // Apply filter array to record batch
+                                Ok(filter_array) => {
+                                    
self.metrics.selectivity.add_part(filter_array.true_count());
+                                    
self.metrics.selectivity.add_total(batch.num_rows());
+
+                                    
self.batch_coalescer.push_batch_with_filter(batch.clone(), filter_array)?;
+                                    Ok(())
+                                }
+                                Err(_) => {
+                                    internal_err!(
+                                        "Cannot create filter_array from 
non-boolean predicates"
+                                    )
+                                }
+                            }
+                        })?;
 
-                    
self.metrics.selectivity.add_part(filtered_batch.num_rows());
-                    self.metrics.selectivity.add_total(batch.num_rows());
+                    timer.done();
 
-                    // Skip entirely filtered batches
-                    if filtered_batch.num_rows() == 0 {
-                        continue;
+                    if self.batch_coalescer.has_completed_batch() {
+                        poll = Poll::Ready(Some(Ok(self
+                            .batch_coalescer
+                            .next_completed_batch()
+                            .expect("has_completed_batch is true"))));
+                        break;
+                    }
+                    continue;
+                }
+                None => {
+                    self.batch_coalescer.finish_buffered_batch().unwrap();

Review Comment:
   I think we should probably check for the error here rather than unwrapping



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to