mdashti opened a new issue, #20425:
URL: https://github.com/apache/datafusion/issues/20425

   ### Is your feature request related to a problem or challenge?
   
   When `SortExec` has a fetch limit (TopK mode) and its input is already 
sorted, it short-circuits to a simple `LimitStream` that just takes the first N 
rows. This is a correct optimization for producing results — but it skips the 
TopK heap implementation entirely, which means it **never creates or updates a 
`DynamicFilterPhysicalExpr`**.
   
   This is a problem for custom `ExecutionPlan` implementations (e.g. table 
scans backed by external indexes) that accept dynamic filters to prune rows 
before materialization. These scans rely on TopK progressively tightening its 
threshold as it processes batches, but when the input happens to be sorted, the 
dynamic filter is never produced and the scan reads every row.
   
   This commonly happens with `SortMergeJoinExec`: the join output is sorted by 
the join keys, so a parent `SortExec(TopK)` on the same keys detects the 
ordering and falls back to `LimitStream`. The result is that the entire join is 
materialized before applying the limit, even though the scan nodes could have 
pruned most rows in advance.
   
   
   ### Describe the solution you'd like
   
   `SortExec` in TopK mode should always create and update its 
`DynamicFilterPhysicalExpr`, regardless of whether the input is already sorted. 
Even when the sort itself is a no-op, the dynamic filter it produces is 
valuable for downstream operators that support filter pushdown.
   
   Concretely:
   - When input is sorted and fetch is set, `SortExec` should still use its 
TopK heap (or a lightweight equivalent) to track the current threshold and 
update the dynamic filter expression between batches.
   - The `DynamicFilterPhysicalExpr` should still be pushed down through the 
`FilterPushdown` framework, allowing leaf nodes to use it for pruning.
   
   ### Describe alternatives you've considered
   
   Our current workaround (in https://github.com/paradedb/paradedb/pull/4193) 
is a custom `StripOrderingExec` wrapper that hides the input ordering from 
`SortExec`, forcing it into its full TopK code path. `StripOrderingExec` then 
intercepts the dynamic filter in `handle_child_pushdown_result` and manually 
injects it into our custom scan nodes.
   
   This works but has several drawbacks:
   - It's fragile — it defeats DataFusion's ordering-aware optimizations and 
requires a second `FilterPushdown` pass after our custom optimizer rule.
   - It requires interior mutability (`Arc<RwLock<...>>`) to set filters on 
scan nodes without triggering `with_new_children` rebuilds on ancestors (which 
would create a new `DynamicFilterPhysicalExpr` and break the shared reference 
from TopK).
   - It duplicates logic that DataFusion already has — the TopK heap and 
dynamic filter infrastructure.
   
   A related improvement would be for `SortMergeJoinExec` to support 
`FilterPushdown` for `DynamicFilterPhysicalExpr`, so that even if TopK does 
produce a dynamic filter, it can naturally reach leaf scan nodes through the 
join. Today, `SortMergeJoinExec` doesn't participate in filter pushdown at all, 
requiring manual tree traversal to inject filters.
   
   
   ### Additional context
   
   The scenario in full (depicted from 
[join_custom_scan.out#L3436-L3443](https://github.com/paradedb/paradedb/blob/591ff1b4d9f4c72620b2ba3de3f684da3138e9d3/pg_search/tests/pg_regress/expected/join_custom_scan.out#L3436-L3443)):
   
   ```
   GlobalLimitExec(fetch=10)
     SortMergeJoinExec(on=[id])      ← output is sorted by id
       CustomScan(sorted by id)       ← could prune rows with dynamic filter
       CustomScan(sorted by id)       ← could prune rows with dynamic filter
   ```
   
   DataFusion's logical optimizer sees that the join output is already sorted 
and eliminates the `SortExec`, leaving only `GlobalLimitExec`. Since 
`GlobalLimitExec` doesn't produce dynamic filters, the custom scans read all 
rows.
   
   Our workaround transforms this into:
   
   ```
   SortExec(TopK, fetch=10)
     StripOrderingExec                ← hides ordering so TopK uses its heap
       SortMergeJoinExec(on=[id])
         CustomScan(dynamic_filters=1)
         CustomScan(dynamic_filters=1)
   ```
   
   Ideally, DataFusion would handle this natively — either by having 
`GlobalLimitExec` produce dynamic filters when its child is sorted, or by 
keeping `SortExec(TopK)` in the plan and having it update the dynamic filter 
even when the sort is a no-op.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to