zhuqi-lucas opened a new pull request, #22922:
URL: https://github.com/apache/datafusion/pull/22922

   ## Which issue does this PR close?
   
   Closes #22920.
   
   ## Rationale for this change
   
   #21807 introduced the `DynamicFilterPhysicalExpr` dedup pipeline so that 
identical references on the wire reconstruct to one shared `Arc<Inner>` on the 
decode side via `expression_id` cache keys. The pipeline works end-to-end for 
plans built from upstream types, and #22011 hooked it through `SortExec` / 
`AggregateExec` / `HashJoinExec` plan codecs.
   
   However, the pipeline stops at the `PhysicalExtensionCodec` boundary. 
Downstream users with custom file sources or plan extension types encode/decode 
their nested `PhysicalExprNode` fields via `try_encode_expr` / 
`try_decode_expr`, which did not receive the active 
`PhysicalProtoConverterExtension`. The only entry points available inside such 
a codec were the free `serialize_physical_expr` / `parse_physical_expr` 
helpers, hardwired to `DefaultPhysicalProtoConverter`. So any expression inside 
an extension codec's serialized blob got `expr_id = None` on the wire, and the 
deduplicating deserializer's cache never hit for refs that crossed an extension 
boundary -- even when an outer `DeduplicatingProtoConverter` was in effect on 
the rest of the plan.
   
   Concretely: two references to the same `DynamicFilterPhysicalExpr` (one 
inside an extension expr, one outside) reconstructed as **distinct** `Inner` 
allocations after roundtrip, so heap-max updates from a TopK `SortExec` failed 
to propagate to a `FileScan` predicate carried by a custom file source.
   
   ## What changes are included in this PR?
   
   This matches the existing plan-level pattern -- `try_encode` and 
`try_decode` already take a `proto_converter` parameter, added when the plan 
codec was wired into the converter pipeline. This change extends the expr-level 
methods the same way:
   
   ```rust
   fn try_decode_expr(
       &self,
       buf: &[u8],
       inputs: &[Arc<dyn PhysicalExpr>],
       proto_converter: &dyn PhysicalProtoConverterExtension,  // new
   ) -> Result<Arc<dyn PhysicalExpr>>;
   
   fn try_encode_expr(
       &self,
       node: &Arc<dyn PhysicalExpr>,
       buf: &mut Vec<u8>,
       proto_converter: &dyn PhysicalProtoConverterExtension,  // new
   ) -> Result<()>;
   ```
   
   The two internal call sites in `serialize_physical_expr_with_converter` and 
`parse_physical_expr_with_converter` are updated to thread the active converter 
through. Downstream codecs can now route nested expression encode/decode 
through `proto_converter.physical_expr_to_proto` / 
`proto_converter.proto_to_physical_expr`, picking up dedup automatically.
   
   ## Are these changes tested?
   
   Yes — a new `extension_codec_expr_participates_in_deduplication` 
proto-roundtrip test constructs a `BinaryExpr` whose left operand is a bare 
`DynamicFilterPhysicalExpr` and right operand is a custom `WrapperExpr` whose 
extension codec embeds the same dynamic filter inside its serialized blob. 
After a `DeduplicatingProtoConverter` roundtrip, an `update()` applied to the 
bare-side decoded filter is observed by `current()` on the wrapped-side decoded 
filter, proving both refs back the same `Inner`. Without the converter-aware 
extension codec the test fails because the two refs end up with distinct 
`Inner` allocations.
   
   ## Are there any user-facing changes?
   
   Yes -- this is a **breaking change** for downstream codecs that override 
`try_encode_expr` / `try_decode_expr`. They will need to add the new 
`proto_converter` parameter (and may name it `_proto_converter` if they don't 
carry nested expressions). Codecs that only override the plan-level 
`try_encode` / `try_decode` are unaffected.
   
   Wire format is unchanged (`PhysicalExprNode.expr_id` is unchanged); the fix 
is purely on the Rust trait API.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to