ajegou opened a new pull request, #22852:
URL: https://github.com/apache/datafusion/pull/22852
…atch
## Which issue does this PR close?
- Closes #22849.
## Rationale for this change
`TopK::insert_batch` short-circuits when the heap's dynamic filter rejects
every row in a batch:
```rust
if !filter.has_true() {
// nothing to filter, so no need to update
return Ok(());
}
```
The early-exit check `attempt_early_completion(&batch)` lives later in the
same function, gated on `replacements > 0`. So a batch that the filter rejects
entirely bypasses the check.
The heap's dynamic filter is derived from the heap's worst row (via
`update_filter`). A batch whose rows all come from a strictly worse sort prefix
is exactly the batch the filter rejects entirely — i.e. the very signal
`attempt_early_completion` is designed to detect ("the next batch is past the
heap's boundary, we can stop") is what causes the function to short-circuit
*before* the check runs.
This is a feature-interaction regression between two PRs that were both
correct in isolation. The `attempt_early_completion` mechanism was added by
#15563 (closing #15529). At the time, there was no heap-derived dynamic filter
on TopK, so the only sensible call site was right after a successful heap
insertion. Two months later, #15770 added the dynamic-filter pushdown for TopK
sorts, introducing the `!filter.has_true()` short-circuit. The two features
address different problems and the new short-circuit didn't connect to the
existing prefix-completion check — which is how this gap opened up.
**Consequence**: on a TopK over an input ordered on the sort prefix,
`finished = true` is never set once the heap stabilizes. Since `finished` is
the signal `SortExec` uses to stop pulling from its input (via
`Poll::Ready(None)` from the TopK stream, which cascades into dropping the
source stream), the source keeps being polled long past the point where no
further row can improve the heap. The LIMIT optimization effectively degrades
to "heap saves memory but reads everything"; sources with cancellable streams
(e.g. networked sources) never receive the cancellation signal.
## What changes are included in this PR?
Single behavioral change in `datafusion/physical-plan/src/topk/mod.rs`: call
`attempt_early_completion(&batch)` immediately before the `return Ok(())` in
the `!filter.has_true()` branch.
Why this scope, not a broader restructuring:
- The existing `attempt_early_completion` call inside `if replacements > 0`
is load-bearing for a related case: a batch containing a mix of "still
valuable" rows and "past the boundary" rows. The existing
`test_try_finish_marks_finished_with_prefix` test covers this case — Batch 2
with `a=[2,3], b=[10,20]` against a heap where `heap.max.a = 2`; the `(2, 10)`
row must be inserted before the check on the `(3, 20)` last row triggers.
Moving the call earlier would skip the insertion of valuable rows and break
that test.
- The bug is specifically that the *short-circuit* path doesn't call the
check. The fix targets exactly that path.
- A related but separate gap is not addressed here: when `filter.has_true()
== true` but `replacements == 0` (the filter accepts some rows but
`find_new_topk_items` ends up inserting none of them), the existing call inside
`if replacements > 0` is also skipped. This requires a divergence between the
heap's filter predicate and the row-byte comparison used inside
`find_new_topk_items`, which shouldn't normally happen (the filter is derived
from the heap's worst row using the same comparator). A deterministic synthetic
repro would likely require concurrent heap updates from sibling partitions or
boundary-value edge cases (NaN/NULL semantics, type coercion). Happy to send a
follow-up if reviewers want it covered; the workload that motivated this fix
was the filter-rejection case empirically.
## Are these changes tested?
Yes. Added a regression test
`test_try_finish_fires_when_filter_rejects_entire_batch`. The assertion target
is `topk.finished` — the flag that signals "stop pulling from the source" to
upstream consumers (read by `TopKExec::poll_next` to emit `Poll::Ready(None)`).
Asserting that the flag transitions on the fully-filter-rejected batch is
equivalent to asserting that the source-stopping mechanism activates.
- Builds a TopK over a `(a, b)` sort with prefix `a`, k=3.
- Inserts a batch that fills the heap with rows from `a ∈ {1, 2}`;
`update_filter` tightens the filter to `a < 2 OR (a = 2 AND b < 30)`.
- Inserts a second batch with all rows at `a = 3` — filter rejects every row.
- Without the fix: `insert_batch` short-circuits, `topk.finished` stays
`false`. Test fails.
- With the fix: `attempt_early_completion` fires (last-row prefix `a = 3` >
heap.max prefix `a = 2`), `topk.finished` becomes `true`. Test passes.
The test also asserts the emitted top-K is unchanged from after batch 1,
confirming no candidate row was incorrectly excluded by the early bail.
All 28 existing `topk::` tests continue to pass (including
`test_try_finish_marks_finished_with_prefix`, which exercises the mixed-prefix
case).
## Are there any user-facing changes?
No public API or output changes. The fix only changes when TopK marks itself
`finished = true` — specifically, it now fires `attempt_early_completion` for
batches that are entirely rejected by the heap's dynamic filter, where
previously it would silently skip the check. Output of TopK is unchanged; only
the early-exit behavior improves.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]