ajegou opened a new issue, #22874:
URL: https://github.com/apache/datafusion/issues/22874
### Describe the bug
In multi-partition `SortExec`, each input partition gets its own `TopK`
instance but they share a single `TopKDynamicFilters`. A partition that has not
yet inserted `k` rows into its local heap (so `heap.max() == None`) can still
receive batches that are entirely rejected by the shared filter — because a
sibling partition has already filled its heap and tightened the global
threshold.
When this happens, `attempt_early_completion` cannot conclude termination
because of an early guard:
```rust
let Some(max_topk_row) = self.heap.max() else {
return Ok(());
};
```
`heap.max()` only returns `Some` once the local heap is full, so the lagging
partition can never bail this way. It keeps draining its sorted input even
though no row from it can improve the global top-k.
This is a separate failure mode from #22849 (which #22852 fixes). #22849 /
#22852 covers the case where the local heap is full and the filter rejects an
entire batch — the existing `heap.max()` check then succeeds and termination
can be concluded. The case here is when the local heap is empty: the local
check has no max row to compare against, so the source-stopping signal isn't
sent regardless of how tight the shared filter has become
### To Reproduce
On a partition-preserving plan (`SortExec` with default partitioning) over
an input sorted on the TopK's leading sort key:
- Partition 0 receives the lowest range of the leading key, fills its heap
to k, and tightens the shared filter to "rows beat heap.max".
- Partitions 1..N receive higher leading-key ranges. Their first batches all
fail the shared filter. Their local heaps stay empty.
- Each lagging partition keeps reading from its source until exhausted.
Empirical signal on the `sort-tpch --sorted --limit 10` benchmark from
#15563, comparing pre-#15770 (`0c3bb78e`) to post-#15770 + #22852 applied:
queries with multi-key TopK on the sorted leading key (Q8, Q9, Q10) remain ~5×
slower than pre-regression on default-partition runs. The same queries return
to ~pre-regression timing once the lagging-partition bail also works
### Expected behavior
When the shared dynamic filter has been tightened by a sibling partition, a
lagging partition should be able to conclude early termination from a
strictly-greater batch prefix — without needing its own local heap to be full
first.
### Additional context
The clean fix is to cache the prefix projection of the global threshold row
on `TopKDynamicFilters` (next to the existing `threshold_row`), and let
`attempt_early_completion` consult that global prefix before falling back to
the `heap.max()` check. The shared-filter completion semantics (`mark_complete`
on `emit`) also need care so that one partition doesn't finalize the shared
filter while siblings can still tighten it.
@geoffreyclaude has a follow-up PR ready that implements this; it will be
opened against apache shortly. The follow-up is independent of and additive to
#22852
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]