zhuqi-lucas opened a new issue, #22920:
URL: https://github.com/apache/datafusion/issues/22920

   # `PhysicalExtensionCodec` cannot participate in `DynamicFilterPhysicalExpr` 
deduplication introduced by #21807
   
   ## Context
   
   #21807's motivation was distributed execution (cf. `Informs: 
datafusion-contrib/datafusion-distributed#180`). The dedup pipeline lets a 
`SortExec(TopK)` running on one worker keep `Arc<DynamicFilterPhysicalExpr>` 
identity with the FileScan predicate that `FilterPushdown` cloned from it, so 
heap-max updates propagate to row-group pruning after the plan crosses a 
network/proto boundary.
   
   For distributed engines built on top of `datafusion-proto` 
(datafusion-distributed, ballista, in-house distributed query engines), the win 
only fully materializes when **every** plan and expression in the wire format 
flows through the dedup pipeline. The current trait surface leaves a gap 
precisely at the extension-codec boundary, which is exactly where these engines 
plug in their custom file sources and plan nodes.
   
   ## Problem
   
   #21807 introduced a deduplication pipeline for `DynamicFilterPhysicalExpr` 
across `datafusion-proto` roundtrip:
   
   - `DynamicFilterPhysicalExpr::Inner` carries an `expression_id` that is 
preserved across `update()` and `with_new_children`.
   - `DeduplicatingSerializer` stamps each emitted `PhysicalExprNode.expr_id` 
from `expr.expression_id()`.
   - `DeduplicatingDeserializer` caches by `expr_id` and reuses one `Arc<dyn 
PhysicalExpr>` for every reference with a matching id.
   
   Together with #22011 (Sort/Aggregate/HashJoin proto carrying their dynamic 
filters), the pipeline works end-to-end for plans built from upstream-only 
execution-plan and expression types: a `SortExec(TopK)` and a pushed-down 
`DataSourceExec(ParquetSource)` reconstruct on the decode side with the SAME 
`Arc<DynamicFilterPhysicalExpr>` shared between `SortExec.filter` and the 
FileScan predicate, so heap-max updates propagate.
   
   However, the same is not true once a `PhysicalExtensionCodec` is in the 
path. Downstream users that define custom plan / file-source types (anyone with 
a `try_encode` / `try_decode` impl on the extension trait whose proto carries a 
nested `PhysicalExprNode` for, e.g., a predicate) cannot participate in the 
dedup pipeline. Looking at the relevant trait surface in `datafusion-proto`:
   
   ```rust
   pub trait PhysicalExtensionCodec {
       fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> 
Result<()>;
       fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], ctx: 
&TaskContext) -> Result<Arc<dyn ExecutionPlan>>;
       fn try_encode_expr(&self, node: &Arc<dyn PhysicalExpr>, buf: &mut 
Vec<u8>) -> Result<()>;
       fn try_decode_expr(&self, buf: &[u8], inputs: &[Arc<dyn PhysicalExpr>]) 
-> Result<Arc<dyn PhysicalExpr>>;
       // ...
   }
   ```
   
   None of these receive the active `&dyn PhysicalProtoConverterExtension`. 
When a custom codec needs to serialize a nested `PhysicalExprNode` inside its 
proto (for example a predicate field on a custom file-source node), the only 
entry point it can call is `serialize_physical_expr(value, codec)` — which is 
hard-wired to `DefaultPhysicalProtoConverter`. The same on the decode side: 
`parse_physical_expr(node, ctx, schema, codec)` always uses the default 
converter.
   
   The consequence is that even when the outer 
`PhysicalPlanNode::try_from_physical_plan_with_converter(..., 
&DeduplicatingSerializer::new())` is in effect, any `PhysicalExprNode` reached 
through an extension codec gets `expr_id: None` on the wire. The matching 
`DeduplicatingDeserializer` cache never finds a hit for those refs, two 
distinct `Arc`s are rebuilt on the decode side, and the dedup story stops at 
the extension boundary.
   
   The wire format already has the bandwidth for this 
(`PhysicalExprNode.expr_id` is `optional uint64`); the gap is purely on the 
Rust trait API.
   
   ## Demonstration sketch
   
   ```rust
   // A minimal custom codec whose proto has a nested PhysicalExprNode 
predicate.
   struct CustomCodec { /* ... */ }
   impl PhysicalExtensionCodec for CustomCodec {
       fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> 
Result<()> {
           let exec = node.as_any().downcast_ref::<CustomFileScan>().unwrap();
           let predicate = exec
               .predicate()
               .map(|p| serialize_physical_expr(p, self))   // <-- forced to 
DefaultPhysicalProtoConverter
               .transpose()?;
           let proto = CustomFileScanNode {
               // ...
               predicate, // PhysicalExprNode with expr_id == None even under 
DeduplicatingSerializer
           };
           proto.encode(buf)?;
           Ok(())
       }
       // try_decode symmetrically uses parse_physical_expr (also default-only)
   }
   
   // Build a plan where SortExec.filter and CustomFileScan.predicate share one
   // Arc<DynamicFilterPhysicalExpr>, then:
   let serializer = DeduplicatingSerializer::new();
   let proto = serializer.execution_plan_to_proto(&plan, &CustomCodec { /* ... 
*/ })?;
   let bytes = proto.encode_to_vec();
   let parsed = PhysicalPlanNode::decode(bytes.as_slice())?;
   let deserializer = DeduplicatingDeserializer::new();
   let decoded = deserializer.proto_to_execution_plan(&ctx, &CustomCodec { /* 
... */ }, &parsed)?;
   
   // Both Arcs are DynamicFilterPhysicalExpr, but they are different 
allocations.
   assert!(!Arc::ptr_eq(decoded_sort_filter, decoded_filescan_predicate));
   ```
   
   ## Proposed fix
   
   Extend `PhysicalExtensionCodec` with `_with_converter` variants whose 
default implementations forward to the existing methods (so every downstream 
impl keeps working untouched):
   
   ```rust
   pub trait PhysicalExtensionCodec {
       // ...existing methods unchanged...
   
       fn try_encode_expr_with_converter(
           &self,
           node: &Arc<dyn PhysicalExpr>,
           buf: &mut Vec<u8>,
           _proto_converter: &dyn PhysicalProtoConverterExtension,
       ) -> Result<()> {
           self.try_encode_expr(node, buf)
       }
   
       fn try_decode_expr_with_converter(
           &self,
           buf: &[u8],
           inputs: &[Arc<dyn PhysicalExpr>],
           _proto_converter: &dyn PhysicalProtoConverterExtension,
       ) -> Result<Arc<dyn PhysicalExpr>> {
           self.try_decode_expr(buf, inputs)
       }
   
       fn try_encode_with_converter(
           &self,
           node: Arc<dyn ExecutionPlan>,
           buf: &mut Vec<u8>,
           _proto_converter: &dyn PhysicalProtoConverterExtension,
       ) -> Result<()> {
           self.try_encode(node, buf)
       }
   
       fn try_decode_with_converter(
           &self,
           buf: &[u8],
           inputs: &[Arc<dyn ExecutionPlan>],
           ctx: &TaskContext,
           _proto_converter: &dyn PhysicalProtoConverterExtension,
       ) -> Result<Arc<dyn ExecutionPlan>> {
           self.try_decode(buf, inputs, ctx)
       }
   }
   ```
   
   Then plumb the converter through the three call sites in `datafusion-proto`:
   
   1. `serialize_physical_expr_with_converter` in `to_proto.rs` — the 
`Extension` fallback that calls `codec.try_encode_expr` should call 
`codec.try_encode_expr_with_converter(_, _, proto_converter)`.
   2. `parse_physical_expr_with_converter` in `from_proto.rs` — the 
`ExprType::Extension` arm that calls `codec.try_decode_expr` should call 
`codec.try_decode_expr_with_converter(_, _, proto_converter)`.
   3. The extension-plan roundtrip in `physical_plan/mod.rs` — both the encode 
side (`codec.try_encode`) and decode side (`codec.try_decode`) of the 
`Extension` `PhysicalPlanNode` variant should call the new `_with_converter` 
methods.
   
   Downstream codecs that override the new methods can now write:
   
   ```rust
   fn try_encode_expr_with_converter(
       &self,
       node: &Arc<dyn PhysicalExpr>,
       buf: &mut Vec<u8>,
       proto_converter: &dyn PhysicalProtoConverterExtension,
   ) -> Result<()> {
       let exec = /* downcast to my custom type */;
       let predicate = exec
           .predicate()
           .map(|p| proto_converter.physical_expr_to_proto(p, self))  // <-- 
now dedup-aware
           .transpose()?;
       // ...
   }
   ```
   
   And the `DeduplicatingSerializer / DeduplicatingDeserializer` cache extends 
naturally through the extension boundary.
   
   ## Why this is backward-compatible
   
   - Default implementations of the new methods forward to the existing 
methods, so any current `PhysicalExtensionCodec` impl keeps compiling and 
behaving identically. No wire-format change (the `expr_id` field already exists 
on `PhysicalExprNode`).
   - Downstream users opt in by overriding the new methods. Until they do, 
behavior is exactly what it is today.
   
   ## Scope check
   
   I have a working local implementation of the above against a stable release 
branch and a downstream codec that exercises it (custom file source with a 
`DynamicFilterPhysicalExpr` predicate). Happy to open a PR if the direction is 
agreeable; first wanted to surface the design here in case there's an 
alternative being considered (e.g., a wrapper type around the codec instead of 
new trait methods, or threading the converter through a context parameter).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to