This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-52 by this push:
     new 18fdd8b690 [branch-52] Fix Internal error: Assertion failed: 
!self.finished: LimitedBatchCoalescer (#19785) (#19836)
18fdd8b690 is described below

commit 18fdd8b69005e502c3c6bff8fedb9306ec9bea76
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Jan 16 07:09:24 2026 -0500

    [branch-52] Fix Internal error: Assertion failed: !self.finished: 
LimitedBatchCoalescer (#19785) (#19836)
    
    ## Which issue does this PR close?
    
    - part of #19784
    - related to https://github.com/apache/datafusion/pull/19785
    - backport fix of https://github.com/apache/datafusion/issues/19781
    
    ## Rationale for this change
    
    Backport a regression found by @bert-beyondloops into the 52 release
    line
    
    ## What changes are included in this PR?
    Backport fix for https://github.com/apache/datafusion/issues/19781,
    https://github.com/apache/datafusion/pull/19785 to branch-52
    
    ## Are these changes tested?
    
    Yes
    ## Are there any user-facing changes?
    
    bug fix
    
    Co-authored-by: Bert Vermeiren 
<[email protected]>
    Co-authored-by: Bert Vermeiren <[email protected]>
---
 datafusion/physical-plan/src/coalesce/mod.rs       |  4 ++
 datafusion/physical-plan/src/filter.rs             | 73 +++++++++-------------
 datafusion/sqllogictest/test_files/limit.slt       |  2 +-
 .../test_files/limit_single_row_batches.slt        | 22 +++++++
 4 files changed, 55 insertions(+), 46 deletions(-)

diff --git a/datafusion/physical-plan/src/coalesce/mod.rs 
b/datafusion/physical-plan/src/coalesce/mod.rs
index b3947170d9..ea1a87d091 100644
--- a/datafusion/physical-plan/src/coalesce/mod.rs
+++ b/datafusion/physical-plan/src/coalesce/mod.rs
@@ -134,6 +134,10 @@ impl LimitedBatchCoalescer {
         Ok(())
     }
 
+    pub(crate) fn is_finished(&self) -> bool {
+        self.finished
+    }
+
     /// Return the next completed batch, if any
     pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
         self.inner.next_completed_batch()
diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index 674fe6692a..e724cdad64 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -26,8 +26,7 @@ use super::{
     ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
     RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
-use crate::coalesce::LimitedBatchCoalescer;
-use crate::coalesce::PushBatchStatus::LimitReached;
+use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
 use crate::common::can_project;
 use crate::execution_plan::CardinalityEffect;
 use crate::filter_pushdown::{
@@ -711,23 +710,6 @@ impl FilterExecMetrics {
     }
 }
 
-impl FilterExecStream {
-    fn flush_remaining_batches(
-        &mut self,
-    ) -> Poll<Option<std::result::Result<RecordBatch, DataFusionError>>> {
-        // Flush any remaining buffered batch
-        match self.batch_coalescer.finish() {
-            Ok(()) => {
-                
Poll::Ready(self.batch_coalescer.next_completed_batch().map(|batch| {
-                    self.metrics.selectivity.add_part(batch.num_rows());
-                    Ok(batch)
-                }))
-            }
-            Err(e) => Poll::Ready(Some(Err(e))),
-        }
-    }
-}
-
 pub fn batch_filter(
     batch: &RecordBatch,
     predicate: &Arc<dyn PhysicalExpr>,
@@ -767,10 +749,26 @@ impl Stream for FilterExecStream {
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        let poll;
         let elapsed_compute = 
self.metrics.baseline_metrics.elapsed_compute().clone();
         loop {
+            // If there is a completed batch ready, return it
+            if let Some(batch) = self.batch_coalescer.next_completed_batch() {
+                self.metrics.selectivity.add_part(batch.num_rows());
+                let poll = Poll::Ready(Some(Ok(batch)));
+                return self.metrics.baseline_metrics.record_poll(poll);
+            }
+
+            if self.batch_coalescer.is_finished() {
+                // If input is done and no batches are ready, return None to 
signal end of stream.
+                return Poll::Ready(None);
+            }
+
+            // Attempt to pull the next batch from the input stream.
             match ready!(self.input.poll_next_unpin(cx)) {
+                None => {
+                    self.batch_coalescer.finish()?;
+                    // continue draining the coalescer
+                }
                 Some(Ok(batch)) => {
                     let timer = elapsed_compute.timer();
                     let status = self.predicate.as_ref()
@@ -802,37 +800,22 @@ impl Stream for FilterExecStream {
                         })?;
                     timer.done();
 
-                    if let LimitReached = status {
-                        poll = self.flush_remaining_batches();
-                        break;
-                    }
-
-                    if let Some(batch) = 
self.batch_coalescer.next_completed_batch() {
-                        self.metrics.selectivity.add_part(batch.num_rows());
-                        poll = Poll::Ready(Some(Ok(batch)));
-                        break;
-                    }
-                    continue;
-                }
-                None => {
-                    // Flush any remaining buffered batch
-                    match self.batch_coalescer.finish() {
-                        Ok(()) => {
-                            poll = self.flush_remaining_batches();
+                    match status {
+                        PushBatchStatus::Continue => {
+                            // Keep pushing more batches
                         }
-                        Err(e) => {
-                            poll = Poll::Ready(Some(Err(e)));
+                        PushBatchStatus::LimitReached => {
+                            // limit was reached, so stop early
+                            self.batch_coalescer.finish()?;
+                            // continue draining the coalescer
                         }
                     }
-                    break;
-                }
-                value => {
-                    poll = Poll::Ready(value);
-                    break;
                 }
+
+                // Error case
+                other => return Poll::Ready(other),
             }
         }
-        self.metrics.baseline_metrics.record_poll(poll)
     }
 
     fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/datafusion/sqllogictest/test_files/limit.slt 
b/datafusion/sqllogictest/test_files/limit.slt
index 524304546d..96471411e0 100644
--- a/datafusion/sqllogictest/test_files/limit.slt
+++ b/datafusion/sqllogictest/test_files/limit.slt
@@ -871,4 +871,4 @@ DROP TABLE test_limit_with_partitions;
 
 # Tear down src_table table:
 statement ok
-DROP TABLE src_table;
+DROP TABLE src_table;
\ No newline at end of file
diff --git a/datafusion/sqllogictest/test_files/limit_single_row_batches.slt 
b/datafusion/sqllogictest/test_files/limit_single_row_batches.slt
new file mode 100644
index 0000000000..fbdb0140e0
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/limit_single_row_batches.slt
@@ -0,0 +1,22 @@
+
+#  minimize batch size to 1 in order to trigger different code paths
+statement ok
+set datafusion.execution.batch_size = '1';
+
+# ----
+# tests with target partition set to 1
+# ----
+statement ok
+set datafusion.execution.target_partitions = '1';
+
+
+statement ok
+CREATE TABLE filter_limit (i INT) as values (1), (2);
+
+query I
+SELECT COUNT(*) FROM (SELECT i FROM filter_limit WHERE i <> 0 LIMIT 1);
+----
+1
+
+statement ok
+DROP TABLE filter_limit;
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to