ajegou opened a new issue, #22849:
URL: https://github.com/apache/datafusion/issues/22849
### Describe the bug
`TopK::insert_batch` (in `datafusion/physical-plan/src/topk/mod.rs`) calls
`attempt_early_completion(&batch)` only inside the `if replacements > 0` branch
— i.e. only when the batch successfully updated the heap. The same function
short-circuits earlier with `return Ok(())` when the heap's dynamic filter
rejects every row in the batch:
```rust
if !filter.has_true() {
// nothing to filter, so no need to update
return Ok(());
}
```
The heap's dynamic filter (`TopKDynamicFilters`, updated by `update_filter`)
is derived from the heap's worst row. A batch whose rows all come from a
strictly worse sort prefix is exactly the batch the filter rejects entirely —
which is precisely the signal `attempt_early_completion` is designed to detect
("the next batch is past the heap's boundary, we can stop"). Because the check
is gated on `replacements > 0`, it never fires for those batches, and `finished
= true` is never set. Since `finished` is the signal that tells the surrounding
stream to stop pulling from the input, the source keeps being polled long past
the point where no further row can improve the heap.
### To Reproduce
- Build a TopK over a 2-column schema `(a Int32, b Float64)` with `ORDER BY
a ASC, b ASC LIMIT 3`. Input ordering is on `a` only (the prefix).
- Batch 1: `a = [1, 1, 2]`, `b = [20.0, 15.0, 30.0]`. Fills the heap;
`heap.max` becomes `(a=2, b=30.0)`. `update_filter` tightens the filter to `a <
2 OR (a = 2 AND b < 30.0)`.
- Batch 2: `a = [3, 3]`, `b = [10.0, 20.0]`. Every row has `a = 3`, so the
filter rejects them all.
After `insert_batch(batch2)`, expected state is `topk.finished == true` (the
last row's prefix `a = 3` is strictly greater than the heap's worst-row prefix
`a = 2`, so no future batch can improve the heap). Observed: `topk.finished ==
false`.
### Expected behavior
`attempt_early_completion` should be invoked for every batch where its
preconditions hold (heap full, common sort prefix declared), regardless of
whether the batch updated the heap. A batch fully rejected by the heap's
dynamic filter should still be able to set `finished = true` if its last-row
prefix is strictly worse than the heap's worst.
### Additional context
The TopK partial-prefix early-termination optimization (the
`attempt_early_completion` mechanism) was introduced by
[#15563](https://github.com/apache/datafusion/pull/15563), closing
[#15529](https://github.com/apache/datafusion/issues/15529). At that point
there was no heap-derived dynamic filter on TopK; the natural and only call
site for the check was right after a successful heap insertion.
Two months later, [#15770](https://github.com/apache/datafusion/pull/15770)
added dynamic-filter pushdown for TopK sorts. It introduced the
`!filter.has_true()` short-circuit to handle batches the heap's filter rejects
entirely. The two features address different problems and #15770 didn't connect
its new short-circuit to #15563's check — which is how the gap this issue
describes opened up.
The symptom is a silent performance regression (output of TopK is unchanged;
only the source-stopping signal fails to fire), which is likely why it went
unreported for ~5 months: default test suites verify TopK output rather than
asserting on `topk.finished` after specific batch patterns.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]