gabotechs commented on code in PR #20416:
URL: https://github.com/apache/datafusion/pull/20416#discussion_r2851286605
##########
datafusion/proto/proto/datafusion.proto:
##########
@@ -860,6 +860,12 @@ message PhysicalExprNode {
// across serde roundtrips.
optional uint64 expr_id = 30;
+ // For DynamicFilterPhysicalExpr, this identifies the shared inner state.
+ // Multiple expressions may have different expr_id values (different outer
Arc wrappers)
+ // but the same dynamic_filter_inner_id (shared inner state).
+ // Used to reconstruct shared inner state during deserialization.
+ optional uint64 dynamic_filter_inner_id = 31;
+
Review Comment:
I think we should find ways of not leaking this detail here.
##########
datafusion/proto/src/physical_plan/mod.rs:
##########
@@ -3918,24 +3936,51 @@ impl PhysicalProtoConverterExtension for
DeduplicatingDeserializer {
where
Self: Sized,
{
- if let Some(expr_id) = proto.expr_id {
- // Check cache first
- if let Some(cached) = self.cache.borrow().get(&expr_id) {
- return Ok(Arc::clone(cached));
+ // The entire expr is cached, so re-use it.
+ if let Some(expr_id) = proto.expr_id
+ && let Some(cached) = self.cache.borrow().get(&expr_id)
+ {
+ return Ok(Arc::clone(cached));
+ }
+
+ // Cache miss, we must deserialize the expr.
+ let mut expr =
+ parse_physical_expr_with_converter(proto, ctx, input_schema,
codec, self)?;
+
+ // Check if we need to share inner state with a cached dynamic filter
+ if let Some(dynamic_filter_id) = proto.dynamic_filter_inner_id {
Review Comment:
The amount of special-casing for handling dynamic filters in the protobuf
code seems to big in this PR.
The fact that dynamic filters are in a situation where they claim to be
normal `PhysicalExpr` but they anyway need special treatment in several parts
of the codebase makes me think that there might be better ways of approaching
things in general.
##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -327,6 +468,14 @@ impl DynamicFilterPhysicalExpr {
Arc::strong_count(self) > 1 || Arc::strong_count(&self.inner) > 1
}
+ /// Returns a unique identifier for the inner shared state.
+ ///
+ /// Useful for checking if two [`Arc<PhysicalExpr>`] with the same
+ /// underlying [`DynamicFilterPhysicalExpr`] are the same.
+ pub fn inner_id(&self) -> u64 {
+ Arc::as_ptr(&self.inner) as *const () as u64
Review Comment:
Overall, I think the current way dynamic filters are playing with raw
pointer addresses is a bit like playing with fire.
For example:
- The `is_used()` method returns bool if there is a strong count greater
than one in any of their inner or outer arcs, but you might perfectly have a
strong count greater than one just because of how you happen to lay out your
Rust code, and the filter might still be unused (e.g., in optimization passes,
the strong count can be 6 or 7 easily, and the filter is still unused)
- Choosing a unique identifier of a dynamic filter based on a raw pointer
address u64 representation forces you to perform calls to the operating system
in serialization code (`std::process::id()`) that would not be necessary with a
proper dynamic filter id.
My impression is: the usage of raw pointers addresses as proxy to
business-logic details in dynamic filters has gone too far, and we should be
using proper ids instead.
##########
datafusion/proto/src/physical_plan/to_proto.rs:
##########
@@ -256,6 +258,47 @@ pub fn serialize_physical_expr_with_converter(
codec: &dyn PhysicalExtensionCodec,
proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<protobuf::PhysicalExprNode> {
+ // Check for DynamicFilterPhysicalExpr before snapshotting.
+ // We need to handle it before snapshot_physical_expr because snapshot()
+ // replaces the DynamicFilterPhysicalExpr with its inner expression.
+ if let Some(df) =
value.as_any().downcast_ref::<DynamicFilterPhysicalExpr>() {
Review Comment:
I see one of the main issues is how snapshotting works with dynamic
filtering.
In the physical expression trait:
```rust
...
/// A system or function that can only deal with a hardcoded set of
`PhysicalExpr` implementations
/// or needs to serialize this state to bytes may not be able to handle
these dynamic references.
/// In such cases, we should return a simplified version of the
`PhysicalExpr` that does not
/// contain these dynamic references.
...
fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>>;
```
This doesn't return a snapshot of the `DynamicFilterPhysicalExpr`, it
returns a snapshot to the inner computed physical expression, losing all the
information of the original `DynamicFilterPhysicalExpr`.
What we want in this PR, is that what we serialize is the actual
`DynamicFilterPhysicalExpr`, with all it's details, so probably `
snapshot_physical_expr(Arc::clone(value))?;` should not be getting called at
all for any expression, and we should be able to treat the dynamic filter as
just another serializable `PhysicalExpr`
##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -180,18 +291,52 @@ impl DynamicFilterPhysicalExpr {
}
}
+ /// Create a new [`DynamicFilterPhysicalExpr`] from `self`, except it
overwrites the
+ /// internal state with the source filter's state.
+ ///
+ /// This is a low-level API intended for use by the proto deserialization
layer.
+ ///
+ /// # Safety
Review Comment:
Safety comments in Rust are typically given for when the function has an
`unsafe` block, and the dev needs to explain what are the implicit safety
constraints for users to use that function.
As this function is not `unsafe`, I'd suggest just choosing a different word.
##########
datafusion/proto/src/physical_plan/to_proto.rs:
##########
@@ -256,6 +258,47 @@ pub fn serialize_physical_expr_with_converter(
codec: &dyn PhysicalExtensionCodec,
proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<protobuf::PhysicalExprNode> {
+ // Check for DynamicFilterPhysicalExpr before snapshotting.
+ // We need to handle it before snapshot_physical_expr because snapshot()
+ // replaces the DynamicFilterPhysicalExpr with its inner expression.
+ if let Some(df) =
value.as_any().downcast_ref::<DynamicFilterPhysicalExpr>() {
Review Comment:
Doubling down on the above, the fact that dynamic filters cannot be treated
as normal expressions makes me think that there might be better ways of dealing
with them.
Maybe there's an opportunity for making them normal expressions again.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]