alamb commented on code in PR #19785:
URL: https://github.com/apache/datafusion/pull/19785#discussion_r2690261714
##########
datafusion/sqllogictest/test_files/limit.slt:
##########
@@ -872,3 +872,25 @@ DROP TABLE test_limit_with_partitions;
# Tear down src_table table:
statement ok
DROP TABLE src_table;
+
+
+
+# tests with target partition set to 1
+statement ok
+set datafusion.execution.target_partitions = '1';
Review Comment:
The problem with these settings, as people are observing, is that they
affect all subsequent tests
I like @pepijnve 's suggestion -- let's put this in a separate test. Perhaps
something like `limit_single_row_batches.slt`
Using the setup of one target_partition and batch sizes of one to test all
the limiting corner cases is a great idea. We should probably add some more
basic cases to cover some other operators
##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -767,10 +749,27 @@ impl Stream for FilterExecStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- let poll;
let elapsed_compute =
self.metrics.baseline_metrics.elapsed_compute().clone();
loop {
+ // If there is a completed batch ready, return it
+ if let Some(batch) = self.batch_coalescer.next_completed_batch() {
+ self.metrics.selectivity.add_part(batch.num_rows());
+ let poll = Poll::Ready(Some(Ok(batch)));
+ return self.metrics.baseline_metrics.record_poll(poll);
+ }
+
+ if self.batch_coalescer.is_finished() {
Review Comment:
I think this is the key change -- specifically that on subsequent calls to
`poll_next()` we just drain the coalescer and don't get the next input / try
and put it in.
Previously, if the input returns another batch, the FilterExec will try and
add it to the BatchCoalescer (which is what triggers the assert)
This new code correctly drains the coalescer state 👍 .
I suspect that we haven't seen this on other queries because most/all the
ExecutionPlans that can feed a FilterExec also implement Limit. Thus when the
FilterExec calls `poll_next` on the input, the input returns None and no
additional batch is pushed to the BatchCoalescer.
The default batch size for the memory exec means that it will most often
only return a single batch.
I can't really figure out why setting the number of target partitions to 1
makes any difference (though I verified it is required to trigger the
reproducer)
Perhaps the reason @bert-beyondloops saw this in his system is that you
have some custom execution plan (that doesn't implement limit pushdown 🤔 ).
This is fine I am just trying to explain why we haven't hit this issue before /
in our other tests
##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -767,10 +749,27 @@ impl Stream for FilterExecStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- let poll;
let elapsed_compute =
self.metrics.baseline_metrics.elapsed_compute().clone();
loop {
+ // If there is a completed batch ready, return it
+ if let Some(batch) = self.batch_coalescer.next_completed_batch() {
+ self.metrics.selectivity.add_part(batch.num_rows());
+ let poll = Poll::Ready(Some(Ok(batch)));
+ return self.metrics.baseline_metrics.record_poll(poll);
+ }
+
+ if self.batch_coalescer.is_finished() {
+ // If input is done and no batches are ready, return None to
signal end of stream.
+ let poll = Poll::Ready(None);
+ return self.metrics.baseline_metrics.record_poll(poll);
Review Comment:
FWIW I don't think record_poll is needed here (it only records information
when there is a batch). There is nothing wrong with this call, but it is
somewhat confusing as there are other paths below that return without also
calling `record_poll`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]