adriangb commented on code in PR #22460:
URL: https://github.com/apache/datafusion/pull/22460#discussion_r3296096656
##########
datafusion/physical-expr/src/expressions/dynamic_filters/mod.rs:
##########
@@ -326,6 +329,31 @@ impl DynamicFilterPhysicalExpr {
.await;
}
+ /// Returns `true` if this filter has been marked complete via
+ /// [`Self::mark_complete`] and will therefore never change again.
+ pub(crate) fn is_complete(&self) -> bool {
+ self.inner.read().is_complete
+ }
+
+ /// Subscribe to this filter's updates for cheap, synchronous change
+ /// detection.
+ ///
+ /// The returned [`DynamicFilterSubscription`] lets a consumer poll whether
+ /// the filter's expression has advanced since it last looked, without
+ /// re-walking a predicate tree or re-deriving a generation on every check.
+ /// This is the building block used by [`DynamicFilterTracker`] to watch
+ /// every dynamic filter inside a (possibly composite) predicate.
+ pub(crate) fn subscribe(&self) -> DynamicFilterSubscription {
+ let mut receiver = self.state_watch.subscribe();
+ // Mark the current state as already-seen so the first `observe()` only
+ // reports updates that happen *after* subscription.
+ let last_generation = receiver.borrow_and_update().generation();
Review Comment:
Good call — added `update_before_subscribe_is_not_reported` in `tracker.rs`:
it `update()`s the filter *before* the tracker subscribes and asserts the first
`changed()` is `false` (then a later update is still reported), pinning the
`borrow_and_update()` snapshot in `subscribe()`. Folded into the first commit.
##########
datafusion/physical-expr/src/expressions/dynamic_filters/mod.rs:
##########
@@ -522,6 +550,66 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
}
}
+/// The result of polling a [`DynamicFilterSubscription`].
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub(crate) struct DynamicFilterChange {
+ /// The filter's expression advanced since the previous observation.
+ pub(crate) changed: bool,
+ /// The filter has been marked complete; it will never change again and the
+ /// subscription can be dropped.
+ pub(crate) complete: bool,
+}
+
+/// A cheap, synchronous handle for observing updates to a single
+/// [`DynamicFilterPhysicalExpr`].
+///
+/// Obtained via [`DynamicFilterPhysicalExpr::subscribe`]. Steady-state polling
+/// via [`Self::observe`] is a single atomic load (the underlying
+/// [`tokio::sync::watch`] version counter); the lock is only taken when the
+/// filter has actually been updated.
+#[derive(Debug)]
+pub(crate) struct DynamicFilterSubscription {
+ receiver: watch::Receiver<FilterState>,
+ /// Last generation we reported as "seen". Used to distinguish a real
+ /// expression update from a bare
[`DynamicFilterPhysicalExpr::mark_complete`]
+ /// (which re-broadcasts the current generation without changing the
+ /// expression).
+ last_generation: u64,
+}
+
+impl DynamicFilterSubscription {
+ /// Observe the latest state of the filter.
+ ///
+ /// Reports whether the filter's expression advanced since the previous
call
+ /// and whether it has since been marked complete. Cheap when nothing has
+ /// changed: a single atomic comparison with no lock acquisition.
+ pub(crate) fn observe(&mut self) -> DynamicFilterChange {
+ match self.receiver.has_changed() {
+ Ok(true) => {
+ let state = *self.receiver.borrow_and_update();
+ let changed = state.generation() > self.last_generation;
+ if changed {
+ self.last_generation = state.generation();
+ }
+ DynamicFilterChange {
+ changed,
+ complete: matches!(state, FilterState::Complete { .. }),
+ }
+ }
+ Ok(false) => DynamicFilterChange {
+ changed: false,
+ complete: false,
+ },
+ // The sender was dropped: no further updates are possible, so
treat
+ // the subscription as complete.
+ Err(_) => DynamicFilterChange {
Review Comment:
Done — the `Err(_)` arm now `debug_assert!(false, ...)` (panics in debug if
the sender is ever dropped while a subscription is alive, degrades to
`complete: true` in release so a real bug isn't masked). I checked that
clippy's `assertions_on_constants` does **not** flag `debug_assert!(false,
...)` under `-D warnings`, so no `cfg!` workaround was needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]