adriangb opened a new pull request, #23064:
URL: https://github.com/apache/datafusion/pull/23064
## Which issue does this PR close?
- Relates to #20443.
- Extracted from the abandoned #21267.
## Rationale for this change
This adds support for generating dynamic filters for the left and right
sides of a `SortMergeJoinExec`, enabling range-based pruning of both sides of
the join (for `Inner` / `LeftSemi` / `RightSemi`). Consumers that support
filter pushdown (e.g. Parquet scans) can use the pushed-down filter to prune at
scan time.
This is a scoped re-do of #21267, which was abandoned. It differs in two
important ways.
### 1. It does not touch unrelated operators
#21267 modified `handle_child_pushdown_result` in a number of operators
(`coalesce_batches`, `coop`, `filter`, `projection`, `repartition`, `sort`,
`sort_preserving_merge`, `limit`) to clone-and-rewrap their updated children.
Those changes are **unnecessary**: the filter-pushdown driver
(`push_down_filters` in `datafusion-physical-optimizer`) already rebuilds each
parent with its updated children via `with_new_children_if_necessary` plus the
`ptr_eq` safety net:
```rust
if res.updated_node.is_none() && !Arc::ptr_eq(&updated_node, node) {
res.updated_node = Some(updated_node)
}
```
The integration test `test_smj_dynamic_filter_present_in_plan` confirms the
`DynamicFilter` reaches a `DataSourceExec` *through* the inserted `SortExec` +
`RepartitionExec` with none of those operator changes.
### 2. Correctness: build from complete info, publish once
The dynamic-filter coordinator is rewritten to follow the model already used
by `HashJoinExec`'s `SharedBuildAccumulator`: **build the filter from complete
information and publish it exactly once.**
The original `SharedSortMergeBoundsAccumulator` shared a *single* dynamic
filter across the concurrently-executing hash partitions feeding the join, and
advanced a one-sided bound as partitions made progress — *raising* it when a
partition exhausted. Because that single bound gates rows across all
partitions, it could non-deterministically prune valid join rows. For example,
with the existing `joins.slt` `Date32` inner-join test (`t1.c1 = {1,2,NULL,3}`
⋈ `t2.c1 = {1,NULL,NULL,3}`, `batch_size=2`, `target_partitions=2`), it dropped
the `key=3` row in roughly 4 of 6 runs.
The accumulator now gathers the feeding side's global `[min, max]` (nulls
skipped) and, only once **all** partitions are exhausted, publishes a static
superset predicate `col >= min AND col <= max` (or `lit(false)` when no
non-null keys were seen) and marks the filter complete.
A consequence of the publish-once design: for the common `scan → SortExec →
SMJ` shape, the blocking sort drains the scan before the bound is known, so
there is no skip-ahead benefit there; the benefit accrues to pushdown-capable,
already-sorted scans. A future enhancement could route per-partition advancing
bounds via a hashed `CASE` expression (as `HashJoinExec`'s `Partitioned` mode
does) to recover skip-ahead pruning while staying correct.
## What changes are included in this PR?
- `SortMergeJoinExec` generates dynamic filters in
`gather_filters_for_pushdown` (Post phase) and captures accepted ones in
`handle_child_pushdown_result`.
- New `SharedSortMergeBoundsAccumulator` (publish-once `[min, max]`
superset, mirroring HashJoin).
- The two SMJ stream implementations report join keys to the accumulator.
- `dynamic_filter_updates` metric.
- Tests: a fuzz/integration suite (`smj_filter_pushdown.rs`) including a
multi-partition + NULL-key regression test that runs the previously-flaky shape
40×, plus updated EXPLAIN expectations in `joins.slt`, `sort_merge_join.slt`,
`explain_tree.slt`.
## Are these changes tested?
Yes. New `smj_filter_pushdown` integration tests (correctness with/without
the filter, plan-presence, and a multi-partition NULL-key regression test),
updated sqllogictest EXPLAIN plans, plus the existing SMJ unit tests. The full
sqllogictest suite, `datafusion-physical-optimizer`, and
`datafusion-physical-plan` join tests pass.
## Are there any user-facing changes?
EXPLAIN output for plans containing a `SortMergeJoinExec` with
dynamic-filter pushdown enabled (the default) now shows a `DynamicFilter`
pushed to the scans it feeds (as a `FilterExec` for sources that cannot absorb
it, or as a scan predicate for those that can). Like `HashJoinExec`, the filter
is not rendered on the join node itself.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]