adriangb opened a new pull request, #22460:
URL: https://github.com/apache/datafusion/pull/22460

   ## Which issue does this PR close?
   
   - Closes #.
   
   N/A — extracted from a design discussion around the duplicated "does this 
filter have a dynamic portion that might change?" / "has the filter changed?" 
patterns (e.g. #22450, `FilePruner`). Happy to file a tracking issue if 
preferred.
   
   ## Rationale for this change
   
   `DynamicFilterPhysicalExpr` has a rich *producer* API (`update()`, 
`mark_complete()`, `wait_update()`, `wait_complete()`), but *consumers* that 
hold a predicate which *contains* dynamic filters have only a bare, recursive 
`snapshot_generation() -> u64`. Several call sites hand-roll the same 
boilerplate around it:
   
   - store a `last_generation: Option<u64>`,
   - recompute `snapshot_generation(&predicate)` (a full tree walk) on 
**every** check,
   - diff it, and rebuild an expensive derived artifact (e.g. a 
`PruningPredicate`) on change.
   
   `FilePruner` does exactly this today, and the runtime row-group pruner in 
#22450 reimplements the identical dance. None of them exploit 
`mark_complete()`, so they keep re-walking the tree even after the filters can 
no longer change.
   
   This PR adds a small consumer-side counterpart so the pattern lives in one 
place.
   
   ## What changes are included in this PR?
   
   **New API (`datafusion-physical-expr`):**
   
   - `DynamicFilterPhysicalExpr::subscribe() -> DynamicFilterSubscription` and 
`is_complete()`. A subscription observes a single filter through its existing 
`tokio::sync::watch` channel — steady-state polling is a single atomic load; 
the lock is taken only when the filter actually moved. A bare `mark_complete()` 
(which re-broadcasts the current generation) is distinguished from a real 
expression change, so it does not trigger a spurious rebuild.
   - `DynamicFilterTracker` walks a (possibly composite) predicate **once**, 
subscribing to every still-incomplete dynamic filter, then answers `changed()` 
by polling only that **shrinking** set (completed filters are dropped). No more 
re-folding `snapshot_generation()` over the whole tree on every batch.
   - `DynamicFilterTracking::classify` returns `Static` / `AllComplete` / 
`Watching(..)` in one traversal, so a caller can decide both *"is a one-shot 
prune worthwhile?"* and *"do I need to keep re-checking?"*.
   
   **Consumer migration:**
   
   - `FilePruner` now classifies its predicate once at construction and 
rebuilds the pruning predicate on the first check + only when a watched filter 
moves.
   - The Parquet opener skips wrapping the scan in `EarlyStoppingStream` when 
the predicate is `Static`/`AllComplete` — the up-front `prune_file` check 
already captured everything such a predicate can prune, so per-batch 
re-checking was pure overhead.
   
   This is intentionally scoped as a **draft**. Natural follow-ups: migrate the 
runtime `RowGroupPruner` (#22450) onto the same tracker; replace the remaining 
`is_dynamic_physical_expr` / free-function `snapshot_generation` call sites; 
and (separately) decide whether the `snapshot_generation()` trait method + FFI 
entry can eventually be retired.
   
   ## Are these changes tested?
   
   Yes — new unit tests in `dynamic_filter_tracker.rs` cover: static / 
already-complete / watching classification, detecting an update exactly once, 
`mark_complete()` not counting as a change, a coalesced update+complete 
reported once, and independent tracking of multiple filters in a composite 
predicate. Existing `datafusion-pruning` and `datafusion-datasource-parquet` 
suites pass unchanged.
   
   ## Are there any user-facing changes?
   
   New public API in `datafusion-physical-expr` (`DynamicFilterTracker`, 
`DynamicFilterTracking`, `DynamicFilterSubscription`, `DynamicFilterChange`, 
and `DynamicFilterPhysicalExpr::{subscribe, is_complete}`). No behavior change 
for end users beyond avoiding redundant per-batch re-pruning work for 
non-dynamic predicates.
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to