LiaCastaneda commented on code in PR #20416:
URL: https://github.com/apache/datafusion/pull/20416#discussion_r2830332263


##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -438,16 +587,6 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
     fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         self.render(f, |expr, f| expr.fmt_sql(f))
     }
-
-    fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
-        // Return the current expression as a snapshot.
-        Ok(Some(self.current()?))
-    }
-
-    fn snapshot_generation(&self) -> u64 {
-        // Return the current generation of the expression.
-        self.inner.read().generation
-    }

Review Comment:
   I wonder if unimplementing these APIs would cause any regressions on any 
existing assumptions, for example the `ParquetOpener` 
[here](https://github.com/apache/datafusion/blob/0022d8e503f0dc0ee40ead545114147fc703e263/datafusion/datasource-parquet/src/opener.rs#L333)
 needs to know if a `PhysicalExpr` is dynamic by calling  
`snapshot_generation`, if we don't implement this method then it will always 
return 0 by default and I think no predicates will be pushed down to parquet 
because `is_dynamic_physical_expr` would return false. 



##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -180,18 +291,52 @@ impl DynamicFilterPhysicalExpr {
         }
     }
 
+    /// Create a new [`DynamicFilterPhysicalExpr`] from `self`, except it 
overwrites the
+    /// internal state with the source filter's state.
+    ///
+    /// This is a low-level API intended for use by the proto deserialization 
layer.
+    ///
+    /// # Safety
+    ///
+    /// The dynamic filter should not be in use when calling this method, 
otherwise there
+    /// may be undefined behavior. Changing the inner state of a filter may do 
the following:
+    /// - transition the state to complete without notifying the watch
+    /// - cause a generation number to be emitted which is out of order
+    pub fn new_from_source(
+        self: &Arc<Self>,
+        source: &DynamicFilterPhysicalExpr,
+    ) -> Result<Self> {
+        // If there's any references to this filter or any watchers, we should 
not replace the
+        // inner state.
+        if self.is_used() || self.state_watch.receiver_count() > 0 {

Review Comment:
   if there are watchers/consumers then` is_used()` will return `true`, so I 
think we can just check `is_used` here



##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -438,16 +587,6 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
     fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         self.render(f, |expr, f| expr.fmt_sql(f))
     }
-
-    fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {

Review Comment:
   IIUC we removed `snapshot()` here because `snapshot_physical_expr` is called 
unconditionally at the top of `serialize_physical_expr_with_converter` -- if 
`snapshot()` returns` Some(self.current()?)`, the `DynamicFilterPhysicalExpr` 
gets replaced by its `inner` expression before the 
`downcast_ref::<DynamicFilterPhysicalExpr>()` branch ever gets a chance to see 
it. By returning None, the node passes through unchanged and gets serialized as 
a `whole` which is the behaviour we want.
   
   However, I wonder if a  safer approach would be to check for 
`DynamicFilterPhysicalExpr` before calling `snapshot_physical_expr` in 
`to_proto`, and only call `snapshot_physical_expr` if the expression is not a 
`DynamicFilterPhysicalExpr`. That way `snapshot()` could be restored to return 
`Some(self.current()?)`, which would also fix a regression for other callers 
that actually depend on snapshot() retuning the collapsed expression -- I think 
 
[PruningPredicate::try_new](https://github.com/apache/datafusion/blob/0022d8e503f0dc0ee40ead545114147fc703e263/datafusion/pruning/src/pruning_predicate.rs#L464),
 calls `snapshot_physical_expr_opt` to transform a `DynamicFilterPhysicalExpr` 
into an expression it can analyze for pruning, so with the current approach I 
think that caller gets silently broken.



##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -327,6 +467,14 @@ impl DynamicFilterPhysicalExpr {
         Arc::strong_count(self) > 1 || Arc::strong_count(&self.inner) > 1
     }
 
+    /// Returns a unique identifier for the inner shared state.
+    ///
+    /// Useful for checking if two [`Arc<PhysicalExpr>`] with the same
+    /// underlying [`DynamicFilterPhysicalExpr`] are the same.
+    pub fn inner_id(&self) -> u64 {
+        Arc::as_ptr(&self.inner) as *const () as u64
+    }

Review Comment:
   yes, that is correct, we should check for both because both are possible



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to