mdashti opened a new issue, #20425: URL: https://github.com/apache/datafusion/issues/20425
### Is your feature request related to a problem or challenge? When `SortExec` has a fetch limit (TopK mode) and its input is already sorted, it short-circuits to a simple `LimitStream` that just takes the first N rows. This is a correct optimization for producing results — but it skips the TopK heap implementation entirely, which means it **never creates or updates a `DynamicFilterPhysicalExpr`**. This is a problem for custom `ExecutionPlan` implementations (e.g. table scans backed by external indexes) that accept dynamic filters to prune rows before materialization. These scans rely on TopK progressively tightening its threshold as it processes batches, but when the input happens to be sorted, the dynamic filter is never produced and the scan reads every row. This commonly happens with `SortMergeJoinExec`: the join output is sorted by the join keys, so a parent `SortExec(TopK)` on the same keys detects the ordering and falls back to `LimitStream`. The result is that the entire join is materialized before applying the limit, even though the scan nodes could have pruned most rows in advance. ### Describe the solution you'd like `SortExec` in TopK mode should always create and update its `DynamicFilterPhysicalExpr`, regardless of whether the input is already sorted. Even when the sort itself is a no-op, the dynamic filter it produces is valuable for downstream operators that support filter pushdown. Concretely: - When input is sorted and fetch is set, `SortExec` should still use its TopK heap (or a lightweight equivalent) to track the current threshold and update the dynamic filter expression between batches. - The `DynamicFilterPhysicalExpr` should still be pushed down through the `FilterPushdown` framework, allowing leaf nodes to use it for pruning. ### Describe alternatives you've considered Our current workaround (in https://github.com/paradedb/paradedb/pull/4193) is a custom `StripOrderingExec` wrapper that hides the input ordering from `SortExec`, forcing it into its full TopK code path. `StripOrderingExec` then intercepts the dynamic filter in `handle_child_pushdown_result` and manually injects it into our custom scan nodes. This works but has several drawbacks: - It's fragile — it defeats DataFusion's ordering-aware optimizations and requires a second `FilterPushdown` pass after our custom optimizer rule. - It requires interior mutability (`Arc<RwLock<...>>`) to set filters on scan nodes without triggering `with_new_children` rebuilds on ancestors (which would create a new `DynamicFilterPhysicalExpr` and break the shared reference from TopK). - It duplicates logic that DataFusion already has — the TopK heap and dynamic filter infrastructure. A related improvement would be for `SortMergeJoinExec` to support `FilterPushdown` for `DynamicFilterPhysicalExpr`, so that even if TopK does produce a dynamic filter, it can naturally reach leaf scan nodes through the join. Today, `SortMergeJoinExec` doesn't participate in filter pushdown at all, requiring manual tree traversal to inject filters. ### Additional context The scenario in full (depicted from [join_custom_scan.out#L3436-L3443](https://github.com/paradedb/paradedb/blob/591ff1b4d9f4c72620b2ba3de3f684da3138e9d3/pg_search/tests/pg_regress/expected/join_custom_scan.out#L3436-L3443)): ``` GlobalLimitExec(fetch=10) SortMergeJoinExec(on=[id]) ← output is sorted by id CustomScan(sorted by id) ← could prune rows with dynamic filter CustomScan(sorted by id) ← could prune rows with dynamic filter ``` DataFusion's logical optimizer sees that the join output is already sorted and eliminates the `SortExec`, leaving only `GlobalLimitExec`. Since `GlobalLimitExec` doesn't produce dynamic filters, the custom scans read all rows. Our workaround transforms this into: ``` SortExec(TopK, fetch=10) StripOrderingExec ← hides ordering so TopK uses its heap SortMergeJoinExec(on=[id]) CustomScan(dynamic_filters=1) CustomScan(dynamic_filters=1) ``` Ideally, DataFusion would handle this natively — either by having `GlobalLimitExec` produce dynamic filters when its child is sorted, or by keeping `SortExec(TopK)` in the plan and having it update the dynamic filter even when the sort is a no-op. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
