This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-52 by this push:
new 18fdd8b690 [branch-52] Fix Internal error: Assertion failed:
!self.finished: LimitedBatchCoalescer (#19785) (#19836)
18fdd8b690 is described below
commit 18fdd8b69005e502c3c6bff8fedb9306ec9bea76
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Jan 16 07:09:24 2026 -0500
[branch-52] Fix Internal error: Assertion failed: !self.finished:
LimitedBatchCoalescer (#19785) (#19836)
## Which issue does this PR close?
- part of #19784
- related to https://github.com/apache/datafusion/pull/19785
- backport fix of https://github.com/apache/datafusion/issues/19781
## Rationale for this change
Backport a regression found by @bert-beyondloops into the 52 release
line
## What changes are included in this PR?
Backport fix for https://github.com/apache/datafusion/issues/19781,
https://github.com/apache/datafusion/pull/19785 to branch-52
## Are these changes tested?
Yes
## Are there any user-facing changes?
bug fix
Co-authored-by: Bert Vermeiren
<[email protected]>
Co-authored-by: Bert Vermeiren <[email protected]>
---
datafusion/physical-plan/src/coalesce/mod.rs | 4 ++
datafusion/physical-plan/src/filter.rs | 73 +++++++++-------------
datafusion/sqllogictest/test_files/limit.slt | 2 +-
.../test_files/limit_single_row_batches.slt | 22 +++++++
4 files changed, 55 insertions(+), 46 deletions(-)
diff --git a/datafusion/physical-plan/src/coalesce/mod.rs
b/datafusion/physical-plan/src/coalesce/mod.rs
index b3947170d9..ea1a87d091 100644
--- a/datafusion/physical-plan/src/coalesce/mod.rs
+++ b/datafusion/physical-plan/src/coalesce/mod.rs
@@ -134,6 +134,10 @@ impl LimitedBatchCoalescer {
Ok(())
}
+ pub(crate) fn is_finished(&self) -> bool {
+ self.finished
+ }
+
/// Return the next completed batch, if any
pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
self.inner.next_completed_batch()
diff --git a/datafusion/physical-plan/src/filter.rs
b/datafusion/physical-plan/src/filter.rs
index 674fe6692a..e724cdad64 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -26,8 +26,7 @@ use super::{
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
-use crate::coalesce::LimitedBatchCoalescer;
-use crate::coalesce::PushBatchStatus::LimitReached;
+use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
use crate::common::can_project;
use crate::execution_plan::CardinalityEffect;
use crate::filter_pushdown::{
@@ -711,23 +710,6 @@ impl FilterExecMetrics {
}
}
-impl FilterExecStream {
- fn flush_remaining_batches(
- &mut self,
- ) -> Poll<Option<std::result::Result<RecordBatch, DataFusionError>>> {
- // Flush any remaining buffered batch
- match self.batch_coalescer.finish() {
- Ok(()) => {
-
Poll::Ready(self.batch_coalescer.next_completed_batch().map(|batch| {
- self.metrics.selectivity.add_part(batch.num_rows());
- Ok(batch)
- }))
- }
- Err(e) => Poll::Ready(Some(Err(e))),
- }
- }
-}
-
pub fn batch_filter(
batch: &RecordBatch,
predicate: &Arc<dyn PhysicalExpr>,
@@ -767,10 +749,26 @@ impl Stream for FilterExecStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- let poll;
let elapsed_compute =
self.metrics.baseline_metrics.elapsed_compute().clone();
loop {
+ // If there is a completed batch ready, return it
+ if let Some(batch) = self.batch_coalescer.next_completed_batch() {
+ self.metrics.selectivity.add_part(batch.num_rows());
+ let poll = Poll::Ready(Some(Ok(batch)));
+ return self.metrics.baseline_metrics.record_poll(poll);
+ }
+
+ if self.batch_coalescer.is_finished() {
+ // If input is done and no batches are ready, return None to
signal end of stream.
+ return Poll::Ready(None);
+ }
+
+ // Attempt to pull the next batch from the input stream.
match ready!(self.input.poll_next_unpin(cx)) {
+ None => {
+ self.batch_coalescer.finish()?;
+ // continue draining the coalescer
+ }
Some(Ok(batch)) => {
let timer = elapsed_compute.timer();
let status = self.predicate.as_ref()
@@ -802,37 +800,22 @@ impl Stream for FilterExecStream {
})?;
timer.done();
- if let LimitReached = status {
- poll = self.flush_remaining_batches();
- break;
- }
-
- if let Some(batch) =
self.batch_coalescer.next_completed_batch() {
- self.metrics.selectivity.add_part(batch.num_rows());
- poll = Poll::Ready(Some(Ok(batch)));
- break;
- }
- continue;
- }
- None => {
- // Flush any remaining buffered batch
- match self.batch_coalescer.finish() {
- Ok(()) => {
- poll = self.flush_remaining_batches();
+ match status {
+ PushBatchStatus::Continue => {
+ // Keep pushing more batches
}
- Err(e) => {
- poll = Poll::Ready(Some(Err(e)));
+ PushBatchStatus::LimitReached => {
+ // limit was reached, so stop early
+ self.batch_coalescer.finish()?;
+ // continue draining the coalescer
}
}
- break;
- }
- value => {
- poll = Poll::Ready(value);
- break;
}
+
+ // Error case
+ other => return Poll::Ready(other),
}
}
- self.metrics.baseline_metrics.record_poll(poll)
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/datafusion/sqllogictest/test_files/limit.slt
b/datafusion/sqllogictest/test_files/limit.slt
index 524304546d..96471411e0 100644
--- a/datafusion/sqllogictest/test_files/limit.slt
+++ b/datafusion/sqllogictest/test_files/limit.slt
@@ -871,4 +871,4 @@ DROP TABLE test_limit_with_partitions;
# Tear down src_table table:
statement ok
-DROP TABLE src_table;
+DROP TABLE src_table;
\ No newline at end of file
diff --git a/datafusion/sqllogictest/test_files/limit_single_row_batches.slt
b/datafusion/sqllogictest/test_files/limit_single_row_batches.slt
new file mode 100644
index 0000000000..fbdb0140e0
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/limit_single_row_batches.slt
@@ -0,0 +1,22 @@
+
+# minimize batch size to 1 in order to trigger different code paths
+statement ok
+set datafusion.execution.batch_size = '1';
+
+# ----
+# tests with target partition set to 1
+# ----
+statement ok
+set datafusion.execution.target_partitions = '1';
+
+
+statement ok
+CREATE TABLE filter_limit (i INT) as values (1), (2);
+
+query I
+SELECT COUNT(*) FROM (SELECT i FROM filter_limit WHERE i <> 0 LIMIT 1);
+----
+1
+
+statement ok
+DROP TABLE filter_limit;
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]