zhuqi-lucas opened a new pull request, #21712:
URL: https://github.com/apache/datafusion/pull/21712

   ## Which issue does this PR close?
   
   Closes #21691
   
   ## Rationale for this change
   
   TopK's dynamic filter starts as `lit(true)` (no filtering) and only tightens 
after processing enough rows to fill the heap. This means the first few row 
groups are never pruned by the dynamic filter. For queries like `ORDER BY col 
DESC LIMIT 100` on parquet files with good statistics, we can compute a tight 
initial threshold **before reading any data**.
   
   ## What changes are included in this PR?
   
   **Core implementation** (`datafusion/datasource-parquet/src/opener.rs`):
   
   Three new functions:
   - `try_init_topk_threshold()` — main function that finds the dynamic filter 
in the predicate, scans RG statistics, computes the best threshold, and calls 
`update()` on the `DynamicFilterPhysicalExpr`
   - `find_dynamic_filter()` — recursively walks the predicate tree to find a 
`DynamicFilterPhysicalExpr`
   - `compute_best_threshold_from_stats()` — iterates over row groups and finds 
the optimal threshold
   
   **Algorithm** (single-column sort):
   - **DESC LIMIT K**: `threshold = max(min)` across RGs where `num_rows >= K`, 
filter: `col > threshold`
   - **ASC LIMIT K**: `threshold = min(max)` across RGs where `num_rows >= K`, 
filter: `col < threshold`
   
   Called in `build_stream()` after metadata is loaded but before row filter 
construction, so the updated threshold is used by the row filter for the 
current file and by RG pruning for subsequent files.
   
   The `DynamicFilterPhysicalExpr` is shared across all partitions, so each 
file's threshold update is globally visible.
   
   **Graceful fallback**: skips initialization when statistics are unavailable, 
column not found, multi-column sort, or no qualifying RGs.
   
   ## Are these changes tested?
   
   **13 unit tests** in `opener.rs`:
   - `compute_best_threshold_from_stats`: 6 tests (DESC/ASC, skip small RGs, 
skip nulls, all too small, empty)
   - `find_dynamic_filter`: 3 tests (direct match, nested in conjunction, 
absent)
   - `try_init_topk_threshold`: 4 tests (DESC, ASC, no dynamic filter, 
multi-column skip)
   
   **SLT tests** in `sort_pushdown.slt` (Test H series):
   - Scrambled multi-file parquet data with non-overlapping ranges
   - DESC LIMIT, ASC LIMIT, larger LIMIT spanning files
   - Multi-partition correctness
   - EXPLAIN plan verification
   
   ## Are there any user-facing changes?
   
   No. This is a transparent optimization — same results, potentially faster 
TopK queries on parquet data with statistics.
   
   ## Future work
   
   - Support multi-column sort threshold initialization
   - Compute global threshold from all files' statistics before opening any 
file (currently per-file)
   - Attach sort options to `DynamicFilterPhysicalExpr` to support all TopK 
queries, not just the Inexact sort pushdown path


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to