zhuqi-lucas opened a new issue, #22920:
URL: https://github.com/apache/datafusion/issues/22920
# `PhysicalExtensionCodec` cannot participate in `DynamicFilterPhysicalExpr`
deduplication introduced by #21807
## Context
#21807's motivation was distributed execution (cf. `Informs:
datafusion-contrib/datafusion-distributed#180`). The dedup pipeline lets a
`SortExec(TopK)` running on one worker keep `Arc<DynamicFilterPhysicalExpr>`
identity with the FileScan predicate that `FilterPushdown` cloned from it, so
heap-max updates propagate to row-group pruning after the plan crosses a
network/proto boundary.
For distributed engines built on top of `datafusion-proto`
(datafusion-distributed, ballista, in-house distributed query engines), the win
only fully materializes when **every** plan and expression in the wire format
flows through the dedup pipeline. The current trait surface leaves a gap
precisely at the extension-codec boundary, which is exactly where these engines
plug in their custom file sources and plan nodes.
## Problem
#21807 introduced a deduplication pipeline for `DynamicFilterPhysicalExpr`
across `datafusion-proto` roundtrip:
- `DynamicFilterPhysicalExpr::Inner` carries an `expression_id` that is
preserved across `update()` and `with_new_children`.
- `DeduplicatingSerializer` stamps each emitted `PhysicalExprNode.expr_id`
from `expr.expression_id()`.
- `DeduplicatingDeserializer` caches by `expr_id` and reuses one `Arc<dyn
PhysicalExpr>` for every reference with a matching id.
Together with #22011 (Sort/Aggregate/HashJoin proto carrying their dynamic
filters), the pipeline works end-to-end for plans built from upstream-only
execution-plan and expression types: a `SortExec(TopK)` and a pushed-down
`DataSourceExec(ParquetSource)` reconstruct on the decode side with the SAME
`Arc<DynamicFilterPhysicalExpr>` shared between `SortExec.filter` and the
FileScan predicate, so heap-max updates propagate.
However, the same is not true once a `PhysicalExtensionCodec` is in the
path. Downstream users that define custom plan / file-source types (anyone with
a `try_encode` / `try_decode` impl on the extension trait whose proto carries a
nested `PhysicalExprNode` for, e.g., a predicate) cannot participate in the
dedup pipeline. Looking at the relevant trait surface in `datafusion-proto`:
```rust
pub trait PhysicalExtensionCodec {
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) ->
Result<()>;
fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], ctx:
&TaskContext) -> Result<Arc<dyn ExecutionPlan>>;
fn try_encode_expr(&self, node: &Arc<dyn PhysicalExpr>, buf: &mut
Vec<u8>) -> Result<()>;
fn try_decode_expr(&self, buf: &[u8], inputs: &[Arc<dyn PhysicalExpr>])
-> Result<Arc<dyn PhysicalExpr>>;
// ...
}
```
None of these receive the active `&dyn PhysicalProtoConverterExtension`.
When a custom codec needs to serialize a nested `PhysicalExprNode` inside its
proto (for example a predicate field on a custom file-source node), the only
entry point it can call is `serialize_physical_expr(value, codec)` — which is
hard-wired to `DefaultPhysicalProtoConverter`. The same on the decode side:
`parse_physical_expr(node, ctx, schema, codec)` always uses the default
converter.
The consequence is that even when the outer
`PhysicalPlanNode::try_from_physical_plan_with_converter(...,
&DeduplicatingSerializer::new())` is in effect, any `PhysicalExprNode` reached
through an extension codec gets `expr_id: None` on the wire. The matching
`DeduplicatingDeserializer` cache never finds a hit for those refs, two
distinct `Arc`s are rebuilt on the decode side, and the dedup story stops at
the extension boundary.
The wire format already has the bandwidth for this
(`PhysicalExprNode.expr_id` is `optional uint64`); the gap is purely on the
Rust trait API.
## Demonstration sketch
```rust
// A minimal custom codec whose proto has a nested PhysicalExprNode
predicate.
struct CustomCodec { /* ... */ }
impl PhysicalExtensionCodec for CustomCodec {
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) ->
Result<()> {
let exec = node.as_any().downcast_ref::<CustomFileScan>().unwrap();
let predicate = exec
.predicate()
.map(|p| serialize_physical_expr(p, self)) // <-- forced to
DefaultPhysicalProtoConverter
.transpose()?;
let proto = CustomFileScanNode {
// ...
predicate, // PhysicalExprNode with expr_id == None even under
DeduplicatingSerializer
};
proto.encode(buf)?;
Ok(())
}
// try_decode symmetrically uses parse_physical_expr (also default-only)
}
// Build a plan where SortExec.filter and CustomFileScan.predicate share one
// Arc<DynamicFilterPhysicalExpr>, then:
let serializer = DeduplicatingSerializer::new();
let proto = serializer.execution_plan_to_proto(&plan, &CustomCodec { /* ...
*/ })?;
let bytes = proto.encode_to_vec();
let parsed = PhysicalPlanNode::decode(bytes.as_slice())?;
let deserializer = DeduplicatingDeserializer::new();
let decoded = deserializer.proto_to_execution_plan(&ctx, &CustomCodec { /*
... */ }, &parsed)?;
// Both Arcs are DynamicFilterPhysicalExpr, but they are different
allocations.
assert!(!Arc::ptr_eq(decoded_sort_filter, decoded_filescan_predicate));
```
## Proposed fix
Extend `PhysicalExtensionCodec` with `_with_converter` variants whose
default implementations forward to the existing methods (so every downstream
impl keeps working untouched):
```rust
pub trait PhysicalExtensionCodec {
// ...existing methods unchanged...
fn try_encode_expr_with_converter(
&self,
node: &Arc<dyn PhysicalExpr>,
buf: &mut Vec<u8>,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<()> {
self.try_encode_expr(node, buf)
}
fn try_decode_expr_with_converter(
&self,
buf: &[u8],
inputs: &[Arc<dyn PhysicalExpr>],
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn PhysicalExpr>> {
self.try_decode_expr(buf, inputs)
}
fn try_encode_with_converter(
&self,
node: Arc<dyn ExecutionPlan>,
buf: &mut Vec<u8>,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<()> {
self.try_encode(node, buf)
}
fn try_decode_with_converter(
&self,
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
ctx: &TaskContext,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>> {
self.try_decode(buf, inputs, ctx)
}
}
```
Then plumb the converter through the three call sites in `datafusion-proto`:
1. `serialize_physical_expr_with_converter` in `to_proto.rs` — the
`Extension` fallback that calls `codec.try_encode_expr` should call
`codec.try_encode_expr_with_converter(_, _, proto_converter)`.
2. `parse_physical_expr_with_converter` in `from_proto.rs` — the
`ExprType::Extension` arm that calls `codec.try_decode_expr` should call
`codec.try_decode_expr_with_converter(_, _, proto_converter)`.
3. The extension-plan roundtrip in `physical_plan/mod.rs` — both the encode
side (`codec.try_encode`) and decode side (`codec.try_decode`) of the
`Extension` `PhysicalPlanNode` variant should call the new `_with_converter`
methods.
Downstream codecs that override the new methods can now write:
```rust
fn try_encode_expr_with_converter(
&self,
node: &Arc<dyn PhysicalExpr>,
buf: &mut Vec<u8>,
proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<()> {
let exec = /* downcast to my custom type */;
let predicate = exec
.predicate()
.map(|p| proto_converter.physical_expr_to_proto(p, self)) // <--
now dedup-aware
.transpose()?;
// ...
}
```
And the `DeduplicatingSerializer / DeduplicatingDeserializer` cache extends
naturally through the extension boundary.
## Why this is backward-compatible
- Default implementations of the new methods forward to the existing
methods, so any current `PhysicalExtensionCodec` impl keeps compiling and
behaving identically. No wire-format change (the `expr_id` field already exists
on `PhysicalExprNode`).
- Downstream users opt in by overriding the new methods. Until they do,
behavior is exactly what it is today.
## Scope check
I have a working local implementation of the above against a stable release
branch and a downstream codec that exercises it (custom file source with a
`DynamicFilterPhysicalExpr` predicate). Happy to open a PR if the direction is
agreeable; first wanted to surface the design here in case there's an
alternative being considered (e.g., a wrapper type around the codec instead of
new trait methods, or threading the converter through a context parameter).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]