LiaCastaneda commented on code in PR #20416:
URL: https://github.com/apache/datafusion/pull/20416#discussion_r2829385055
##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -180,18 +243,99 @@ impl DynamicFilterPhysicalExpr {
}
}
+ /// Reconstructs a [`DynamicFilterPhysicalExpr`] from a snapshot.
+ ///
+ /// This is a low-level API intended for use by the proto deserialization
layer.
+ #[doc(hidden)]
+ pub fn new_from_snapshot(snapshot: DynamicFilterSnapshot) -> Self {
+ let DynamicFilterSnapshot {
+ children,
+ remapped_children,
+ generation,
+ inner_expr,
+ is_complete,
+ } = snapshot;
+
+ let state = if is_complete {
+ FilterState::Complete { generation }
+ } else {
+ FilterState::InProgress { generation }
+ };
+ let (state_watch, _) = watch::channel(state);
+
+ Self {
+ children,
+ remapped_children,
+ inner: Arc::new(RwLock::new(Inner {
+ generation,
+ expr: inner_expr,
+ is_complete,
+ })),
+ state_watch,
+ data_type: Arc::new(RwLock::new(None)),
+ nullable: Arc::new(RwLock::new(None)),
+ }
+ }
+
+ /// Atomically captures all state needed for serialization into a
[`DynamicFilterSnapshot`].
+ ///
+ /// This is a low-level API intended for use by the proto deserialization
layer.
+ #[doc(hidden)]
+ pub fn current_snapshot(&self) -> DynamicFilterSnapshot {
+ let (generation, inner_expr, is_complete) = {
+ let inner = self.inner.read();
+ (inner.generation, Arc::clone(&inner.expr), inner.is_complete)
+ };
+ DynamicFilterSnapshot {
+ children: self.children.clone(),
+ remapped_children: self.remapped_children.clone(),
+ generation,
+ inner_expr,
+ is_complete,
+ }
+ }
+
+ /// Create a new [`DynamicFilterPhysicalExpr`] from `self`, except it
overwrites the
+ /// internal state with the source filter's state.
+ ///
+ /// This is a low-level API intended for use by the proto deserialization
layer.
+ ///
+ /// # Safety
+ ///
+ /// The dynamic filter should not be in use when calling this method,
otherwise there
+ /// may be undefined behavior. This method may do the following or worse:
+ /// - transition the state to complete without notifying the watch
+ /// - cause a generation number to be emitted which is out of order
+ pub fn new_from_source(&self, source: &DynamicFilterPhysicalExpr) ->
Result<Self> {
+ // Best effort check that no one is subscribed.
+ if self.state_watch.receiver_count() > 0 {
+ return internal_err!(
+ "Cannot replace the inner state of a DynamicFilterPhysicalExpr
that has subscribers"
+ );
Review Comment:
there are suscribed consumers if someone calls `wait_complete` or
`wait_update` however consumers are not forced to to call either of this
methods, and actually most of consumers will never call them (like parquet). I
think another way you can check if there are any consumers is probably doing
something similar to is_used and checking the strong count of `inner'
##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -327,6 +467,14 @@ impl DynamicFilterPhysicalExpr {
Arc::strong_count(self) > 1 || Arc::strong_count(&self.inner) > 1
}
+ /// Returns a unique identifier for the inner shared state.
+ ///
+ /// Useful for checking if two [`Arc<PhysicalExpr>`] with the same
+ /// underlying [`DynamicFilterPhysicalExpr`] are the same.
+ pub fn inner_id(&self) -> u64 {
+ Arc::as_ptr(&self.inner) as *const () as u64
+ }
Review Comment:
you might be interested in looking at
https://github.com/apache/datafusion/issues/19715#issuecomment-3733169909 we
found out that for some cases the `inner` struct was not the one being cloned
but the outer struct (because we never called `reassign_expr_columns ->
new_with_children`. I have an example DataSource repro
[here](https://github.com/LiaCastaneda/datafusion/commit/5dedd792670a5c0ea4be43e4ee3c88f355c5fc48)
were the inner struct is not clonned.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]