ajegou opened a new issue, #22874:
URL: https://github.com/apache/datafusion/issues/22874

   ### Describe the bug
   
   In multi-partition `SortExec`, each input partition gets its own `TopK` 
instance but they share a single `TopKDynamicFilters`. A partition that has not 
yet inserted `k` rows into its local heap (so `heap.max() == None`) can still 
receive batches that are entirely rejected by the shared filter — because a 
sibling partition has already filled its heap and tightened the global 
threshold.
   
   When this happens, `attempt_early_completion` cannot conclude termination 
because of an early guard:
   
   ```rust
   let Some(max_topk_row) = self.heap.max() else {
       return Ok(());
   };
   ```
   
   `heap.max()` only returns `Some` once the local heap is full, so the lagging 
partition can never bail this way. It keeps draining its sorted input even 
though no row from it can improve the global top-k.
   
   This is a separate failure mode from #22849 (which #22852 fixes). #22849 / 
#22852 covers the case where the local heap is full and the filter rejects an 
entire batch — the existing `heap.max()` check then succeeds and termination 
can be concluded. The case here is when the local heap is empty: the local 
check has no max row to compare against, so the source-stopping signal isn't 
sent regardless of how tight the shared filter has become
   
   ### To Reproduce
   
   On a partition-preserving plan (`SortExec` with default partitioning) over 
an input sorted on the TopK's leading sort key:
   
   - Partition 0 receives the lowest range of the leading key, fills its heap 
to k, and tightens the shared filter to "rows beat heap.max".
   - Partitions 1..N receive higher leading-key ranges. Their first batches all 
fail the shared filter. Their local heaps stay empty.
   - Each lagging partition keeps reading from its source until exhausted.
   
   Empirical signal on the `sort-tpch --sorted --limit 10` benchmark from 
#15563, comparing pre-#15770 (`0c3bb78e`) to post-#15770 + #22852 applied: 
queries with multi-key TopK on the sorted leading key (Q8, Q9, Q10) remain ~5× 
slower than pre-regression on default-partition runs. The same queries return 
to ~pre-regression timing once the lagging-partition bail also works
   
   ### Expected behavior
   
   When the shared dynamic filter has been tightened by a sibling partition, a 
lagging partition should be able to conclude early termination from a 
strictly-greater batch prefix — without needing its own local heap to be full 
first.
   
   ### Additional context
   
   The clean fix is to cache the prefix projection of the global threshold row 
on `TopKDynamicFilters` (next to the existing `threshold_row`), and let 
`attempt_early_completion` consult that global prefix before falling back to 
the `heap.max()` check. The shared-filter completion semantics (`mark_complete` 
on `emit`) also need care so that one partition doesn't finalize the shared 
filter while siblings can still tighten it.
   
   @geoffreyclaude has a follow-up PR ready that implements this; it will be 
opened against apache shortly. The follow-up is independent of and additive to 
#22852


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to