zhuqi-lucas commented on code in PR #22460:
URL: https://github.com/apache/datafusion/pull/22460#discussion_r3296061149


##########
datafusion/physical-expr/src/expressions/dynamic_filters/mod.rs:
##########
@@ -326,6 +329,31 @@ impl DynamicFilterPhysicalExpr {
             .await;
     }
 
+    /// Returns `true` if this filter has been marked complete via
+    /// [`Self::mark_complete`] and will therefore never change again.
+    pub(crate) fn is_complete(&self) -> bool {
+        self.inner.read().is_complete
+    }
+
+    /// Subscribe to this filter's updates for cheap, synchronous change
+    /// detection.
+    ///
+    /// The returned [`DynamicFilterSubscription`] lets a consumer poll whether
+    /// the filter's expression has advanced since it last looked, without
+    /// re-walking a predicate tree or re-deriving a generation on every check.
+    /// This is the building block used by [`DynamicFilterTracker`] to watch
+    /// every dynamic filter inside a (possibly composite) predicate.
+    pub(crate) fn subscribe(&self) -> DynamicFilterSubscription {
+        let mut receiver = self.state_watch.subscribe();
+        // Mark the current state as already-seen so the first `observe()` only
+        // reports updates that happen *after* subscription.
+        let last_generation = receiver.borrow_and_update().generation();

Review Comment:
   Minor: 
   Missing test for the load-bearing `borrow_and_update()` in `subscribe()`: 
tracker constructed *after* an `update()` should report `changed() == false` on 
first call. Worth pinning as a regression test.



##########
datafusion/physical-expr/src/expressions/dynamic_filters/mod.rs:
##########
@@ -522,6 +550,66 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
     }
 }
 
+/// The result of polling a [`DynamicFilterSubscription`].
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub(crate) struct DynamicFilterChange {
+    /// The filter's expression advanced since the previous observation.
+    pub(crate) changed: bool,
+    /// The filter has been marked complete; it will never change again and the
+    /// subscription can be dropped.
+    pub(crate) complete: bool,
+}
+
+/// A cheap, synchronous handle for observing updates to a single
+/// [`DynamicFilterPhysicalExpr`].
+///
+/// Obtained via [`DynamicFilterPhysicalExpr::subscribe`]. Steady-state polling
+/// via [`Self::observe`] is a single atomic load (the underlying
+/// [`tokio::sync::watch`] version counter); the lock is only taken when the
+/// filter has actually been updated.
+#[derive(Debug)]
+pub(crate) struct DynamicFilterSubscription {
+    receiver: watch::Receiver<FilterState>,
+    /// Last generation we reported as "seen". Used to distinguish a real
+    /// expression update from a bare 
[`DynamicFilterPhysicalExpr::mark_complete`]
+    /// (which re-broadcasts the current generation without changing the
+    /// expression).
+    last_generation: u64,
+}
+
+impl DynamicFilterSubscription {
+    /// Observe the latest state of the filter.
+    ///
+    /// Reports whether the filter's expression advanced since the previous 
call
+    /// and whether it has since been marked complete. Cheap when nothing has
+    /// changed: a single atomic comparison with no lock acquisition.
+    pub(crate) fn observe(&mut self) -> DynamicFilterChange {
+        match self.receiver.has_changed() {
+            Ok(true) => {
+                let state = *self.receiver.borrow_and_update();
+                let changed = state.generation() > self.last_generation;
+                if changed {
+                    self.last_generation = state.generation();
+                }
+                DynamicFilterChange {
+                    changed,
+                    complete: matches!(state, FilterState::Complete { .. }),
+                }
+            }
+            Ok(false) => DynamicFilterChange {
+                changed: false,
+                complete: false,
+            },
+            // The sender was dropped: no further updates are possible, so 
treat
+            // the subscription as complete.
+            Err(_) => DynamicFilterChange {

Review Comment:
   `observe()` here treats receiver `Err(_)` (sender dropped) as `complete: 
true`. The sender is kept alive by the predicate `Arc` so this branch shouldn't 
be reachable in practice — consider a `debug_assert!` / `log::warn!` so a 
future bug doesn't get masked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to