ajegou opened a new issue, #22849:
URL: https://github.com/apache/datafusion/issues/22849

   ### Describe the bug
   
   `TopK::insert_batch` (in `datafusion/physical-plan/src/topk/mod.rs`) calls 
`attempt_early_completion(&batch)` only inside the `if replacements > 0` branch 
— i.e. only when the batch successfully updated the heap. The same function 
short-circuits earlier with `return Ok(())` when the heap's dynamic filter 
rejects every row in the batch:
   
   ```rust
   if !filter.has_true() {
       // nothing to filter, so no need to update
       return Ok(());
   }
   ```
   
   The heap's dynamic filter (`TopKDynamicFilters`, updated by `update_filter`) 
is derived from the heap's worst row. A batch whose rows all come from a 
strictly worse sort prefix is exactly the batch the filter rejects entirely — 
which is precisely the signal `attempt_early_completion` is designed to detect 
("the next batch is past the heap's boundary, we can stop"). Because the check 
is gated on `replacements > 0`, it never fires for those batches, and `finished 
= true` is never set. Since `finished` is the signal that tells the surrounding 
stream to stop pulling from the input, the source keeps being polled long past 
the point where no further row can improve the heap.
   
   ### To Reproduce
   
   - Build a TopK over a 2-column schema `(a Int32, b Float64)` with `ORDER BY 
a ASC, b ASC LIMIT 3`. Input ordering is on `a` only (the prefix).
   - Batch 1: `a = [1, 1, 2]`, `b = [20.0, 15.0, 30.0]`. Fills the heap; 
`heap.max` becomes `(a=2, b=30.0)`. `update_filter` tightens the filter to `a < 
2 OR (a = 2 AND b < 30.0)`.
   - Batch 2: `a = [3, 3]`, `b = [10.0, 20.0]`. Every row has `a = 3`, so the 
filter rejects them all.
   
   After `insert_batch(batch2)`, expected state is `topk.finished == true` (the 
last row's prefix `a = 3` is strictly greater than the heap's worst-row prefix 
`a = 2`, so no future batch can improve the heap). Observed: `topk.finished == 
false`.
   
   ### Expected behavior
   
   `attempt_early_completion` should be invoked for every batch where its 
preconditions hold (heap full, common sort prefix declared), regardless of 
whether the batch updated the heap. A batch fully rejected by the heap's 
dynamic filter should still be able to set `finished = true` if its last-row 
prefix is strictly worse than the heap's worst.
   
   ### Additional context
   
   The TopK partial-prefix early-termination optimization (the 
`attempt_early_completion` mechanism) was introduced by 
[#15563](https://github.com/apache/datafusion/pull/15563), closing 
[#15529](https://github.com/apache/datafusion/issues/15529). At that point 
there was no heap-derived dynamic filter on TopK; the natural and only call 
site for the check was right after a successful heap insertion.
   
   Two months later, [#15770](https://github.com/apache/datafusion/pull/15770) 
added dynamic-filter pushdown for TopK sorts. It introduced the 
`!filter.has_true()` short-circuit to handle batches the heap's filter rejects 
entirely. The two features address different problems and #15770 didn't connect 
its new short-circuit to #15563's check — which is how the gap this issue 
describes opened up.
   
   The symptom is a silent performance regression (output of TopK is unchanged; 
only the source-stopping signal fails to fire), which is likely why it went 
unreported for ~5 months: default test suites verify TopK output rather than 
asserting on `topk.finished` after specific batch patterns.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to