geoffreyclaude opened a new pull request, #22991:
URL: https://github.com/apache/datafusion/pull/22991

   ## Which issue does this PR close?
   
   Closes #22874.
   
   Follow-up to #22852.
   
   ## Rationale for this change
   
   PR #22852 fixes the local all-filtered-batch path by calling 
`attempt_early_completion` before returning. The remaining regression is in 
partitioned `SortExec`: every local `TopK` shares one `TopKDynamicFilters`. One 
partition can establish a global threshold before another partition has filled 
its local heap. The second partition then sees fully rejected batches, but 
`heap.max()` is still `None` locally, so it cannot prove completion and keeps 
draining sorted input.
   
   This patch stores the common-prefix row for the shared global threshold, and 
each local `TopK` checks that shared prefix before falling back to its local 
heap prefix. It also prevents local partition `TopK`s from marking a shared 
dynamic filter complete while sibling partitions can still tighten it. 
Single-partition behavior is unchanged.
   
   ## What changes are included in this PR?
   
   - Store a global common-prefix threshold row in `TopKDynamicFilters`.
   - Check that global prefix threshold in `attempt_early_completion` before 
local heap fallback.
   - Keep the PR #22852 all-filtered-batch completion call.
   - Also check completion on batches that pass the filter but produce zero 
heap replacements.
   - Avoid marking shared partitioned `TopK` filters complete from individual 
local partitions.
   - Add tests for shared-filter completion before local heap fill, 
equal-prefix non-completion, missing-prefix non-completion, and DESC/null 
prefix ordering.
   
   ## Are these changes tested?
   
   Commands run on the final rebased branch:
   
   - `cargo fmt --all`
   - `cargo test -p datafusion-physical-plan topk --lib`
   - `cargo test -p datafusion-physical-plan sort --lib`
   - `cargo clippy -p datafusion-physical-plan --lib -- -D warnings`
   - `cargo clippy --all-targets --all-features -- -D warnings`
   - `cargo build --release --bin dfbench`
   
   Benchmark command:
   
   `target/release/dfbench sort-tpch --sorted --limit 10 --iterations 5 --path 
/tmp/df-topk-bench-data/tpch_sf1 -o 
/tmp/df-patched-rerun2-top10_sorted_tpch.json`
   
   Key comparisons from the clean reruns:
   
   - Patched vs pre-#15770 v48: Q8 5.60 ms to 3.98 ms, Q9 7.77 ms to 5.89 ms, 
Q10 10.20 ms to 8.16 ms.
   - Patched vs PR #22852 fix: Q8 28.48 ms to 3.98 ms, Q9 39.47 ms to 5.89 ms, 
Q10 57.70 ms to 8.16 ms.
   
   Debug proof for the bounded-read shape after this patch:
   
   - Q8: `DataSourceExec` output_rows=81.92K, output_batches=10, 
files_processed=0, bytes_scanned=15.79M.
   - Q9: `DataSourceExec` output_rows=81.92K, output_batches=10, 
files_processed=0, bytes_scanned=20.89M.
   - Q10: `DataSourceExec` output_rows=81.92K, output_batches=10, 
files_processed=0, bytes_scanned=34.69M.
   
   Those Q8/Q9/Q10 debug runs show the scan returns to one batch per partition 
instead of draining millions of rows across the remaining file ranges.
   
   ## Are there any user-facing changes?
   
   No. This is an internal physical execution optimization fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to