This is an automated email from the ASF dual-hosted git repository.

blaginin pushed a commit to branch annarose/dict-coercion
in repository https://gitbox.apache.org/repos/asf/datafusion-sandbox.git

commit 66ee0afcabfd59323eeb7b5f76f60a80881bbba3
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Mon Feb 2 08:04:11 2026 -0600

    Preserve PhysicalExpr graph in proto round trip using Arc pointers as 
unique identifiers (#20037)
    
    Replaces #18192 using the APIs in #19437.
    
    Similar to #18192 the end goal here is specifically to enable
    deduplication of `DynamicFilterPhysicalExpr` so that distributed query
    engines can get one step closer to using dynamic filters.
    
    Because it's actually simpler we apply this deduplication to all
    `PhysicalExpr`s with the added benefit that we more faithfully preserve
    the original expression tree (instead of adding new duplicate branches)
    which will have the immediate impact of e.g. not duplicating large
    `InListExpr`s.
---
 Cargo.lock                                         |   1 +
 datafusion/proto/Cargo.toml                        |   1 +
 datafusion/proto/proto/datafusion.proto            |   8 +
 datafusion/proto/src/generated/pbjson.rs           |  22 +
 datafusion/proto/src/generated/prost.rs            |   8 +
 datafusion/proto/src/physical_plan/mod.rs          | 216 ++++++++++
 datafusion/proto/src/physical_plan/to_proto.rs     |  18 +
 .../proto/tests/cases/roundtrip_physical_plan.rs   | 452 ++++++++++++++++++++-
 8 files changed, 724 insertions(+), 2 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 1f28687f4..e582b43c7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2514,6 +2514,7 @@ dependencies = [
  "pbjson 0.9.0",
  "pretty_assertions",
  "prost",
+ "rand 0.9.2",
  "serde",
  "serde_json",
  "tokio",
diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml
index edc96d5a9..3d17ed30d 100644
--- a/datafusion/proto/Cargo.toml
+++ b/datafusion/proto/Cargo.toml
@@ -66,6 +66,7 @@ datafusion-proto-common = { workspace = true }
 object_store = { workspace = true }
 pbjson = { workspace = true, optional = true }
 prost = { workspace = true }
+rand = { workspace = true }
 serde = { version = "1.0", optional = true }
 serde_json = { workspace = true, optional = true }
 
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 59be5c578..2c29597f4 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -850,6 +850,14 @@ message PhysicalExprNode {
   // Was date_time_interval_expr
   reserved 17;
 
+  // Unique identifier for this expression to do deduplication during 
deserialization.
+  // When serializing, this is set to a unique identifier for each combination 
of
+  // expression, process and serialization run.
+  // When deserializing, if this ID has been seen before, the cached Arc is 
returned
+  // instead of creating a new one, enabling reconstruction of referential 
integrity
+  // across serde roundtrips.
+  optional uint64 expr_id = 30;
+
   oneof ExprType {
     // column references
     PhysicalColumn column = 1;
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 3873afcdc..5d8ee5b73 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -16029,10 +16029,18 @@ impl serde::Serialize for PhysicalExprNode {
     {
         use serde::ser::SerializeStruct;
         let mut len = 0;
+        if self.expr_id.is_some() {
+            len += 1;
+        }
         if self.expr_type.is_some() {
             len += 1;
         }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.PhysicalExprNode", len)?;
+        if let Some(v) = self.expr_id.as_ref() {
+            #[allow(clippy::needless_borrow)]
+            #[allow(clippy::needless_borrows_for_generic_args)]
+            struct_ser.serialize_field("exprId", 
ToString::to_string(&v).as_str())?;
+        }
         if let Some(v) = self.expr_type.as_ref() {
             match v {
                 physical_expr_node::ExprType::Column(v) => {
@@ -16104,6 +16112,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode {
         D: serde::Deserializer<'de>,
     {
         const FIELDS: &[&str] = &[
+            "expr_id",
+            "exprId",
             "column",
             "literal",
             "binary_expr",
@@ -16140,6 +16150,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode {
 
         #[allow(clippy::enum_variant_names)]
         enum GeneratedField {
+            ExprId,
             Column,
             Literal,
             BinaryExpr,
@@ -16180,6 +16191,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode {
                         E: serde::de::Error,
                     {
                         match value {
+                            "exprId" | "expr_id" => Ok(GeneratedField::ExprId),
                             "column" => Ok(GeneratedField::Column),
                             "literal" => Ok(GeneratedField::Literal),
                             "binaryExpr" | "binary_expr" => 
Ok(GeneratedField::BinaryExpr),
@@ -16218,9 +16230,18 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode 
{
                 where
                     V: serde::de::MapAccess<'de>,
             {
+                let mut expr_id__ = None;
                 let mut expr_type__ = None;
                 while let Some(k) = map_.next_key()? {
                     match k {
+                        GeneratedField::ExprId => {
+                            if expr_id__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("exprId"));
+                            }
+                            expr_id__ = 
+                                
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
 x.0)
+                            ;
+                        }
                         GeneratedField::Column => {
                             if expr_type__.is_some() {
                                 return 
Err(serde::de::Error::duplicate_field("column"));
@@ -16357,6 +16378,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode {
                     }
                 }
                 Ok(PhysicalExprNode {
+                    expr_id: expr_id__,
                     expr_type: expr_type__,
                 })
             }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 3806e31a4..18dabac51 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1280,6 +1280,14 @@ pub struct PhysicalExtensionNode {
 /// physical expressions
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalExprNode {
+    /// Unique identifier for this expression to do deduplication during 
deserialization.
+    /// When serializing, this is set to a unique identifier for each 
combination of
+    /// expression, process and serialization run.
+    /// When deserializing, if this ID has been seen before, the cached Arc is 
returned
+    /// instead of creating a new one, enabling reconstruction of referential 
integrity
+    /// across serde roundtrips.
+    #[prost(uint64, optional, tag = "30")]
+    pub expr_id: ::core::option::Option<u64>,
     #[prost(
         oneof = "physical_expr_node::ExprType",
         tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 
21"
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index e1f6381d1..2b805c3a2 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -15,7 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::cell::RefCell;
+use std::collections::HashMap;
 use std::fmt::Debug;
+use std::hash::{DefaultHasher, Hash, Hasher};
 use std::sync::Arc;
 
 use arrow::compute::SortOptions;
@@ -2993,6 +2996,7 @@ impl protobuf::PhysicalPlanNode {
                     nulls_first: expr.options.nulls_first,
                 });
                 Ok(protobuf::PhysicalExprNode {
+                    expr_id: None,
                     expr_type: Some(ExprType::Sort(sort_expr)),
                 })
             })
@@ -3078,6 +3082,7 @@ impl protobuf::PhysicalPlanNode {
                     nulls_first: expr.options.nulls_first,
                 });
                 Ok(protobuf::PhysicalExprNode {
+                    expr_id: None,
                     expr_type: Some(ExprType::Sort(sort_expr)),
                 })
             })
@@ -3712,6 +3717,217 @@ impl PhysicalProtoConverterExtension for 
DefaultPhysicalProtoConverter {
     }
 }
 
+/// Internal serializer that adds expr_id to expressions.
+/// Created fresh for each serialization operation.
+struct DeduplicatingSerializer {
+    /// Random salt combined with pointer addresses and process ID to create 
globally unique expr_ids.
+    session_id: u64,
+}
+
+impl DeduplicatingSerializer {
+    fn new() -> Self {
+        Self {
+            session_id: rand::random(),
+        }
+    }
+}
+
+impl PhysicalProtoConverterExtension for DeduplicatingSerializer {
+    fn proto_to_execution_plan(
+        &self,
+        _ctx: &TaskContext,
+        _codec: &dyn PhysicalExtensionCodec,
+        _proto: &protobuf::PhysicalPlanNode,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        internal_err!("DeduplicatingSerializer cannot deserialize execution 
plans")
+    }
+
+    fn execution_plan_to_proto(
+        &self,
+        plan: &Arc<dyn ExecutionPlan>,
+        codec: &dyn PhysicalExtensionCodec,
+    ) -> Result<protobuf::PhysicalPlanNode>
+    where
+        Self: Sized,
+    {
+        protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
+            Arc::clone(plan),
+            codec,
+            self,
+        )
+    }
+
+    fn proto_to_physical_expr(
+        &self,
+        _proto: &protobuf::PhysicalExprNode,
+        _ctx: &TaskContext,
+        _input_schema: &Schema,
+        _codec: &dyn PhysicalExtensionCodec,
+    ) -> Result<Arc<dyn PhysicalExpr>>
+    where
+        Self: Sized,
+    {
+        internal_err!("DeduplicatingSerializer cannot deserialize physical 
expressions")
+    }
+
+    fn physical_expr_to_proto(
+        &self,
+        expr: &Arc<dyn PhysicalExpr>,
+        codec: &dyn PhysicalExtensionCodec,
+    ) -> Result<protobuf::PhysicalExprNode> {
+        let mut proto = serialize_physical_expr_with_converter(expr, codec, 
self)?;
+
+        // Hash session_id, pointer address, and process ID together to create 
expr_id.
+        // - session_id: random per serializer, prevents collisions when 
merging serializations
+        // - ptr: unique address per Arc within a process
+        // - pid: prevents collisions if serializer is shared across processes
+        let mut hasher = DefaultHasher::new();
+        self.session_id.hash(&mut hasher);
+        (Arc::as_ptr(expr) as *const () as u64).hash(&mut hasher);
+        std::process::id().hash(&mut hasher);
+        proto.expr_id = Some(hasher.finish());
+
+        Ok(proto)
+    }
+}
+
+/// Internal deserializer that caches expressions by expr_id.
+/// Created fresh for each deserialization operation.
+#[derive(Default)]
+struct DeduplicatingDeserializer {
+    /// Cache mapping expr_id to deserialized expressions.
+    cache: RefCell<HashMap<u64, Arc<dyn PhysicalExpr>>>,
+}
+
+impl PhysicalProtoConverterExtension for DeduplicatingDeserializer {
+    fn proto_to_execution_plan(
+        &self,
+        ctx: &TaskContext,
+        codec: &dyn PhysicalExtensionCodec,
+        proto: &protobuf::PhysicalPlanNode,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        proto.try_into_physical_plan_with_converter(ctx, codec, self)
+    }
+
+    fn execution_plan_to_proto(
+        &self,
+        _plan: &Arc<dyn ExecutionPlan>,
+        _codec: &dyn PhysicalExtensionCodec,
+    ) -> Result<protobuf::PhysicalPlanNode>
+    where
+        Self: Sized,
+    {
+        internal_err!("DeduplicatingDeserializer cannot serialize execution 
plans")
+    }
+
+    fn proto_to_physical_expr(
+        &self,
+        proto: &protobuf::PhysicalExprNode,
+        ctx: &TaskContext,
+        input_schema: &Schema,
+        codec: &dyn PhysicalExtensionCodec,
+    ) -> Result<Arc<dyn PhysicalExpr>>
+    where
+        Self: Sized,
+    {
+        if let Some(expr_id) = proto.expr_id {
+            // Check cache first
+            if let Some(cached) = self.cache.borrow().get(&expr_id) {
+                return Ok(Arc::clone(cached));
+            }
+            // Deserialize and cache
+            let expr = parse_physical_expr_with_converter(
+                proto,
+                ctx,
+                input_schema,
+                codec,
+                self,
+            )?;
+            self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr));
+            Ok(expr)
+        } else {
+            parse_physical_expr_with_converter(proto, ctx, input_schema, 
codec, self)
+        }
+    }
+
+    fn physical_expr_to_proto(
+        &self,
+        _expr: &Arc<dyn PhysicalExpr>,
+        _codec: &dyn PhysicalExtensionCodec,
+    ) -> Result<protobuf::PhysicalExprNode> {
+        internal_err!("DeduplicatingDeserializer cannot serialize physical 
expressions")
+    }
+}
+
+/// A proto converter that adds expression deduplication during serialization
+/// and deserialization.
+///
+/// During serialization, each expression's Arc pointer address is XORed with a
+/// random session_id to create a salted `expr_id`. This prevents cross-process
+/// collisions when serialized plans are merged.
+///
+/// During deserialization, expressions with the same `expr_id` share the same
+/// Arc, reducing memory usage for plans with duplicate expressions (e.g., 
large
+/// IN lists) and supporting correctly linking [`DynamicFilterPhysicalExpr`] 
instances.
+///
+/// This converter is stateless - it creates internal serializers/deserializers
+/// on demand for each operation.
+///
+/// [`DynamicFilterPhysicalExpr`]: 
https://docs.rs/datafusion-physical-expr/latest/datafusion_physical_expr/expressions/struct.DynamicFilterPhysicalExpr.html
+#[derive(Debug, Default, Clone, Copy)]
+pub struct DeduplicatingProtoConverter {}
+
+impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter {
+    fn proto_to_execution_plan(
+        &self,
+        ctx: &TaskContext,
+        codec: &dyn PhysicalExtensionCodec,
+        proto: &protobuf::PhysicalPlanNode,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let deserializer = DeduplicatingDeserializer::default();
+        proto.try_into_physical_plan_with_converter(ctx, codec, &deserializer)
+    }
+
+    fn execution_plan_to_proto(
+        &self,
+        plan: &Arc<dyn ExecutionPlan>,
+        codec: &dyn PhysicalExtensionCodec,
+    ) -> Result<protobuf::PhysicalPlanNode>
+    where
+        Self: Sized,
+    {
+        let serializer = DeduplicatingSerializer::new();
+        protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
+            Arc::clone(plan),
+            codec,
+            &serializer,
+        )
+    }
+
+    fn proto_to_physical_expr(
+        &self,
+        proto: &protobuf::PhysicalExprNode,
+        ctx: &TaskContext,
+        input_schema: &Schema,
+        codec: &dyn PhysicalExtensionCodec,
+    ) -> Result<Arc<dyn PhysicalExpr>>
+    where
+        Self: Sized,
+    {
+        let deserializer = DeduplicatingDeserializer::default();
+        deserializer.proto_to_physical_expr(proto, ctx, input_schema, codec)
+    }
+
+    fn physical_expr_to_proto(
+        &self,
+        expr: &Arc<dyn PhysicalExpr>,
+        codec: &dyn PhysicalExtensionCodec,
+    ) -> Result<protobuf::PhysicalExprNode> {
+        let serializer = DeduplicatingSerializer::new();
+        serializer.physical_expr_to_proto(expr, codec)
+    }
+}
+
 /// A PhysicalExtensionCodec that tries one of multiple inner codecs
 /// until one works
 #[derive(Debug)]
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index 08ce00da4..a38e59acd 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -71,6 +71,7 @@ pub fn serialize_physical_aggr_expr(
     let mut buf = Vec::new();
     codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
     Ok(protobuf::PhysicalExprNode {
+        expr_id: None,
         expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
             protobuf::PhysicalAggregateExprNode {
                 aggregate_function: 
Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)),
@@ -280,12 +281,14 @@ pub fn serialize_physical_expr_with_converter(
             )),
         };
         return Ok(protobuf::PhysicalExprNode {
+            expr_id: None,
             expr_type: 
Some(protobuf::physical_expr_node::ExprType::Literal(value)),
         });
     }
 
     if let Some(expr) = expr.downcast_ref::<Column>() {
         Ok(protobuf::PhysicalExprNode {
+            expr_id: None,
             expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
                 protobuf::PhysicalColumn {
                     name: expr.name().to_string(),
@@ -295,6 +298,7 @@ pub fn serialize_physical_expr_with_converter(
         })
     } else if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
         Ok(protobuf::PhysicalExprNode {
+            expr_id: None,
             expr_type: 
Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
                 protobuf::UnknownColumn {
                     name: expr.name().to_string(),
@@ -313,12 +317,14 @@ pub fn serialize_physical_expr_with_converter(
         });
 
         Ok(protobuf::PhysicalExprNode {
+            expr_id: None,
             expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
                 binary_expr,
             )),
         })
     } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
         Ok(protobuf::PhysicalExprNode {
+            expr_id: None,
             expr_type: Some(
                 protobuf::physical_expr_node::ExprType::Case(
                     Box::new(
@@ -361,6 +367,7 @@ pub fn serialize_physical_expr_with_converter(
         })
     } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
         Ok(protobuf::PhysicalExprNode {
+            expr_id: None,
             expr_type: 
Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
                 protobuf::PhysicalNot {
                     expr: Some(Box::new(
@@ -371,6 +378,7 @@ pub fn serialize_physical_expr_with_converter(
         })
     } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
         Ok(protobuf::PhysicalExprNode {
+            expr_id: None,
             expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
                 Box::new(protobuf::PhysicalIsNull {
                     expr: Some(Box::new(
@@ -381,6 +389,7 @@ pub fn serialize_physical_expr_with_converter(
         })
     } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
         Ok(protobuf::PhysicalExprNode {
+            expr_id: None,
             expr_type: 
Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
                 Box::new(protobuf::PhysicalIsNotNull {
                     expr: Some(Box::new(
@@ -391,6 +400,7 @@ pub fn serialize_physical_expr_with_converter(
         })
     } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
         Ok(protobuf::PhysicalExprNode {
+            expr_id: None,
             expr_type: 
Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
                 protobuf::PhysicalInListNode {
                     expr: Some(Box::new(
@@ -403,6 +413,7 @@ pub fn serialize_physical_expr_with_converter(
         })
     } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
         Ok(protobuf::PhysicalExprNode {
+            expr_id: None,
             expr_type: 
Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
                 protobuf::PhysicalNegativeNode {
                     expr: Some(Box::new(
@@ -413,12 +424,14 @@ pub fn serialize_physical_expr_with_converter(
         })
     } else if let Some(lit) = expr.downcast_ref::<Literal>() {
         Ok(protobuf::PhysicalExprNode {
+            expr_id: None,
             expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
                 lit.value().try_into()?,
             )),
         })
     } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
         Ok(protobuf::PhysicalExprNode {
+            expr_id: None,
             expr_type: 
Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
                 protobuf::PhysicalCastNode {
                     expr: Some(Box::new(
@@ -430,6 +443,7 @@ pub fn serialize_physical_expr_with_converter(
         })
     } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
         Ok(protobuf::PhysicalExprNode {
+            expr_id: None,
             expr_type: 
Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
                 protobuf::PhysicalTryCastNode {
                     expr: Some(Box::new(
@@ -443,6 +457,7 @@ pub fn serialize_physical_expr_with_converter(
         let mut buf = Vec::new();
         codec.try_encode_udf(expr.fun(), &mut buf)?;
         Ok(protobuf::PhysicalExprNode {
+            expr_id: None,
             expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
                 protobuf::PhysicalScalarUdfNode {
                     name: expr.name().to_string(),
@@ -459,6 +474,7 @@ pub fn serialize_physical_expr_with_converter(
         })
     } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
         Ok(protobuf::PhysicalExprNode {
+            expr_id: None,
             expr_type: 
Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new(
                 protobuf::PhysicalLikeExprNode {
                     negated: expr.negated(),
@@ -475,6 +491,7 @@ pub fn serialize_physical_expr_with_converter(
     } else if let Some(expr) = expr.downcast_ref::<HashExpr>() {
         let (s0, s1, s2, s3) = expr.seeds();
         Ok(protobuf::PhysicalExprNode {
+            expr_id: None,
             expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr(
                 protobuf::PhysicalHashExprNode {
                     on_columns: serialize_physical_exprs(
@@ -500,6 +517,7 @@ pub fn serialize_physical_expr_with_converter(
                     .map(|e| proto_converter.physical_expr_to_proto(e, codec))
                     .collect::<Result<_>>()?;
                 Ok(protobuf::PhysicalExprNode {
+                    expr_id: None,
                     expr_type: 
Some(protobuf::physical_expr_node::ExprType::Extension(
                         protobuf::PhysicalExtensionExprNode { expr: buf, 
inputs },
                     )),
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 5bb771137..0a26025a3 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -116,8 +116,9 @@ use datafusion_proto::bytes::{
 use 
datafusion_proto::physical_plan::from_proto::parse_physical_expr_with_converter;
 use 
datafusion_proto::physical_plan::to_proto::serialize_physical_expr_with_converter;
 use datafusion_proto::physical_plan::{
-    AsExecutionPlan, DefaultPhysicalExtensionCodec, 
DefaultPhysicalProtoConverter,
-    PhysicalExtensionCodec, PhysicalProtoConverterExtension,
+    AsExecutionPlan, DeduplicatingProtoConverter, 
DefaultPhysicalExtensionCodec,
+    DefaultPhysicalProtoConverter, PhysicalExtensionCodec,
+    PhysicalProtoConverterExtension,
 };
 use datafusion_proto::protobuf;
 use datafusion_proto::protobuf::{PhysicalExprNode, PhysicalPlanNode};
@@ -2564,3 +2565,450 @@ fn custom_proto_converter_intercepts() -> Result<()> {
 
     Ok(())
 }
+
+/// Test that expression deduplication works during deserialization.
+/// When the same expression Arc is serialized multiple times, it should be
+/// deduplicated on deserialization (sharing the same Arc).
+#[test]
+fn test_expression_deduplication() -> Result<()> {
+    let field_a = Field::new("a", DataType::Int64, false);
+    let schema = Arc::new(Schema::new(vec![field_a]));
+
+    // Create a shared expression that will be used multiple times
+    let shared_col: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
+
+    // Create an InList expression that uses the same column Arc multiple times
+    // This simulates a real-world scenario where expressions are shared
+    let in_list_expr = in_list(
+        Arc::clone(&shared_col),
+        vec![lit(1i64), lit(2i64), lit(3i64)],
+        &false,
+        &schema,
+    )?;
+
+    // Create a binary expression that uses the shared column and the in_list 
result
+    let binary_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
+        Arc::clone(&shared_col),
+        Operator::Eq,
+        lit(42i64),
+    ));
+
+    // Create a plan that has both expressions (they share the `shared_col` 
Arc)
+    let input = Arc::new(EmptyExec::new(schema.clone()));
+    let filter = FilterExecBuilder::new(in_list_expr, input).build()?;
+    let projection_exprs = vec![ProjectionExpr {
+        expr: binary_expr,
+        alias: "result".to_string(),
+    }];
+    let exec_plan =
+        Arc::new(ProjectionExec::try_new(projection_exprs, Arc::new(filter))?);
+
+    let ctx = SessionContext::new();
+    let codec = DefaultPhysicalExtensionCodec {};
+    let proto_converter = DeduplicatingProtoConverter {};
+
+    // Perform roundtrip
+    let bytes = physical_plan_to_bytes_with_proto_converter(
+        Arc::clone(&exec_plan) as Arc<dyn ExecutionPlan>,
+        &codec,
+        &proto_converter,
+    )?;
+
+    // Create a new converter for deserialization (fresh cache)
+    let deser_converter = DeduplicatingProtoConverter {};
+    let result_plan = physical_plan_from_bytes_with_proto_converter(
+        bytes.as_ref(),
+        ctx.task_ctx().as_ref(),
+        &codec,
+        &deser_converter,
+    )?;
+
+    // Verify the plan structure is correct
+    pretty_assertions::assert_eq!(format!("{exec_plan:?}"), 
format!("{result_plan:?}"));
+
+    Ok(())
+}
+
+/// Test that expression deduplication correctly shares Arcs for identical 
expressions.
+/// This test verifies the core deduplication behavior.
+#[test]
+fn test_expression_deduplication_arc_sharing() -> Result<()> {
+    use datafusion_proto::bytes::{
+        physical_plan_from_bytes_with_proto_converter,
+        physical_plan_to_bytes_with_proto_converter,
+    };
+
+    let field_a = Field::new("a", DataType::Int64, false);
+    let schema = Arc::new(Schema::new(vec![field_a]));
+
+    // Create a column expression
+    let col_expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
+
+    // Create a projection that uses the SAME Arc twice
+    // After roundtrip, both should point to the same Arc
+    let projection_exprs = vec![
+        ProjectionExpr {
+            expr: Arc::clone(&col_expr),
+            alias: "a1".to_string(),
+        },
+        ProjectionExpr {
+            expr: Arc::clone(&col_expr), // Same Arc!
+            alias: "a2".to_string(),
+        },
+    ];
+
+    let input = Arc::new(EmptyExec::new(schema));
+    let exec_plan = Arc::new(ProjectionExec::try_new(projection_exprs, 
input)?);
+
+    let ctx = SessionContext::new();
+    let codec = DefaultPhysicalExtensionCodec {};
+    let proto_converter = DeduplicatingProtoConverter {};
+
+    // Serialize
+    let bytes = physical_plan_to_bytes_with_proto_converter(
+        Arc::clone(&exec_plan) as Arc<dyn ExecutionPlan>,
+        &codec,
+        &proto_converter,
+    )?;
+
+    // Deserialize with a fresh converter
+    let deser_converter = DeduplicatingProtoConverter {};
+    let result_plan = physical_plan_from_bytes_with_proto_converter(
+        bytes.as_ref(),
+        ctx.task_ctx().as_ref(),
+        &codec,
+        &deser_converter,
+    )?;
+
+    // Get the projection from the result
+    let projection = result_plan
+        .as_any()
+        .downcast_ref::<ProjectionExec>()
+        .expect("Expected ProjectionExec");
+
+    let exprs: Vec<_> = projection.expr().iter().collect();
+    assert_eq!(exprs.len(), 2);
+
+    // The key test: both expressions should point to the same Arc after 
deduplication
+    // This is because they were the same Arc before serialization
+    assert!(
+        Arc::ptr_eq(&exprs[0].expr, &exprs[1].expr),
+        "Expected both expressions to share the same Arc after deduplication"
+    );
+
+    Ok(())
+}
+
+/// Test backward compatibility: protos without expr_id should still 
deserialize correctly.
+#[test]
+fn test_backward_compatibility_no_expr_id() -> Result<()> {
+    let field_a = Field::new("a", DataType::Int64, false);
+    let schema = Arc::new(Schema::new(vec![field_a]));
+
+    // Manually create a proto without expr_id set
+    let proto = PhysicalExprNode {
+        expr_id: None, // Simulating old proto without this field
+        expr_type: Some(
+            datafusion_proto::protobuf::physical_expr_node::ExprType::Column(
+                datafusion_proto::protobuf::PhysicalColumn {
+                    name: "a".to_string(),
+                    index: 0,
+                },
+            ),
+        ),
+    };
+
+    let ctx = SessionContext::new();
+    let codec = DefaultPhysicalExtensionCodec {};
+    let proto_converter = DefaultPhysicalProtoConverter {};
+
+    // Should deserialize without error
+    let result = proto_converter.proto_to_physical_expr(
+        &proto,
+        ctx.task_ctx().as_ref(),
+        &schema,
+        &codec,
+    )?;
+
+    // Verify the result is correct
+    let col = result
+        .as_any()
+        .downcast_ref::<Column>()
+        .expect("Expected Column");
+    assert_eq!(col.name(), "a");
+    assert_eq!(col.index(), 0);
+
+    Ok(())
+}
+
+/// Test that deduplication works within a single plan deserialization and that
+/// separate deserializations produce independent expressions (no 
cross-operation sharing).
+#[test]
+fn test_deduplication_within_plan_deserialization() -> Result<()> {
+    use datafusion_proto::bytes::{
+        physical_plan_from_bytes_with_proto_converter,
+        physical_plan_to_bytes_with_proto_converter,
+    };
+
+    let field_a = Field::new("a", DataType::Int64, false);
+    let schema = Arc::new(Schema::new(vec![field_a]));
+
+    // Create a plan with expressions that will be deduplicated
+    let col_expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
+    let projection_exprs = vec![
+        ProjectionExpr {
+            expr: Arc::clone(&col_expr),
+            alias: "a1".to_string(),
+        },
+        ProjectionExpr {
+            expr: Arc::clone(&col_expr), // Same Arc - will be deduplicated
+            alias: "a2".to_string(),
+        },
+    ];
+    let exec_plan = Arc::new(ProjectionExec::try_new(
+        projection_exprs,
+        Arc::new(EmptyExec::new(schema)),
+    )?);
+
+    let ctx = SessionContext::new();
+    let codec = DefaultPhysicalExtensionCodec {};
+    let proto_converter = DeduplicatingProtoConverter {};
+
+    // Serialize
+    let bytes = physical_plan_to_bytes_with_proto_converter(
+        Arc::clone(&exec_plan) as Arc<dyn ExecutionPlan>,
+        &codec,
+        &proto_converter,
+    )?;
+
+    // First deserialization
+    let plan1 = physical_plan_from_bytes_with_proto_converter(
+        bytes.as_ref(),
+        ctx.task_ctx().as_ref(),
+        &codec,
+        &proto_converter,
+    )?;
+
+    // Check that the plan was deserialized correctly with deduplication
+    let projection1 = plan1
+        .as_any()
+        .downcast_ref::<ProjectionExec>()
+        .expect("Expected ProjectionExec");
+    let exprs1: Vec<_> = projection1.expr().iter().collect();
+    assert_eq!(exprs1.len(), 2);
+    assert!(
+        Arc::ptr_eq(&exprs1[0].expr, &exprs1[1].expr),
+        "Expected both expressions to share the same Arc after deduplication"
+    );
+
+    // Second deserialization
+    let plan2 = physical_plan_from_bytes_with_proto_converter(
+        bytes.as_ref(),
+        ctx.task_ctx().as_ref(),
+        &codec,
+        &proto_converter,
+    )?;
+
+    // Check that the second plan was also deserialized correctly
+    let projection2 = plan2
+        .as_any()
+        .downcast_ref::<ProjectionExec>()
+        .expect("Expected ProjectionExec");
+    let exprs2: Vec<_> = projection2.expr().iter().collect();
+    assert_eq!(exprs2.len(), 2);
+    assert!(
+        Arc::ptr_eq(&exprs2[0].expr, &exprs2[1].expr),
+        "Expected both expressions to share the same Arc after deduplication"
+    );
+
+    // Check that there was no deduplication across deserializations
+    assert!(
+        !Arc::ptr_eq(&exprs1[0].expr, &exprs2[0].expr),
+        "Expected expressions from different deserializations to be different 
Arcs"
+    );
+    assert!(
+        !Arc::ptr_eq(&exprs1[1].expr, &exprs2[1].expr),
+        "Expected expressions from different deserializations to be different 
Arcs"
+    );
+
+    Ok(())
+}
+
+/// Test that deduplication works within direct expression deserialization and 
that
+/// separate deserializations produce independent expressions (no 
cross-operation sharing).
+#[test]
+fn test_deduplication_within_expr_deserialization() -> Result<()> {
+    let field_a = Field::new("a", DataType::Int64, false);
+    let schema = Arc::new(Schema::new(vec![field_a]));
+
+    // Create a binary expression where both sides are the same Arc
+    // This allows us to test deduplication within a single deserialization
+    let col_expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
+    let binary_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
+        Arc::clone(&col_expr),
+        Operator::Plus,
+        Arc::clone(&col_expr), // Same Arc - will be deduplicated
+    ));
+
+    let ctx = SessionContext::new();
+    let codec = DefaultPhysicalExtensionCodec {};
+    let proto_converter = DeduplicatingProtoConverter {};
+
+    // Serialize the expression
+    let proto = proto_converter.physical_expr_to_proto(&binary_expr, &codec)?;
+
+    // First expression deserialization
+    let expr1 = proto_converter.proto_to_physical_expr(
+        &proto,
+        ctx.task_ctx().as_ref(),
+        &schema,
+        &codec,
+    )?;
+
+    // Check that deduplication worked within the deserialization
+    let binary1 = expr1
+        .as_any()
+        .downcast_ref::<BinaryExpr>()
+        .expect("Expected BinaryExpr");
+    assert!(
+        Arc::ptr_eq(binary1.left(), binary1.right()),
+        "Expected both sides to share the same Arc after deduplication"
+    );
+
+    // Second expression deserialization
+    let expr2 = proto_converter.proto_to_physical_expr(
+        &proto,
+        ctx.task_ctx().as_ref(),
+        &schema,
+        &codec,
+    )?;
+
+    // Check that the second expression was also deserialized correctly
+    let binary2 = expr2
+        .as_any()
+        .downcast_ref::<BinaryExpr>()
+        .expect("Expected BinaryExpr");
+    assert!(
+        Arc::ptr_eq(binary2.left(), binary2.right()),
+        "Expected both sides to share the same Arc after deduplication"
+    );
+
+    // Check that there was no deduplication across deserializations
+    assert!(
+        !Arc::ptr_eq(binary1.left(), binary2.left()),
+        "Expected expressions from different deserializations to be different 
Arcs"
+    );
+    assert!(
+        !Arc::ptr_eq(binary1.right(), binary2.right()),
+        "Expected expressions from different deserializations to be different 
Arcs"
+    );
+
+    Ok(())
+}
+
+/// Test that session_id rotates between top-level serialization operations.
+/// This verifies that each top-level serialization gets a fresh session_id,
+/// which prevents cross-process collisions when serialized plans are merged.
+#[test]
+fn test_session_id_rotation_between_serializations() -> Result<()> {
+    let field_a = Field::new("a", DataType::Int64, false);
+    let _schema = Arc::new(Schema::new(vec![field_a]));
+
+    // Create a simple expression
+    let col_expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
+
+    let codec = DefaultPhysicalExtensionCodec {};
+    let proto_converter = DeduplicatingProtoConverter {};
+
+    // First serialization
+    let proto1 = proto_converter.physical_expr_to_proto(&col_expr, &codec)?;
+    let expr_id1 = proto1.expr_id.expect("Expected expr_id to be set");
+
+    // Second serialization with the same converter
+    // The session_id should have rotated, so the expr_id should be different
+    // even though we're serializing the same expression (same pointer address)
+    let proto2 = proto_converter.physical_expr_to_proto(&col_expr, &codec)?;
+    let expr_id2 = proto2.expr_id.expect("Expected expr_id to be set");
+
+    // The expr_ids should be different because session_id rotated
+    assert_ne!(
+        expr_id1, expr_id2,
+        "Expected different expr_ids due to session_id rotation between 
serializations"
+    );
+
+    // Also test that serializing the same expression multiple times within
+    // the same top-level operation would give the same expr_id (not testable
+    // here directly since each physical_expr_to_proto is a top-level 
operation,
+    // but the deduplication tests verify this indirectly)
+
+    Ok(())
+}
+
+/// Test that session_id rotation works correctly with execution plans.
+/// This verifies the end-to-end behavior with plan serialization.
+#[test]
+fn test_session_id_rotation_with_execution_plans() -> Result<()> {
+    use datafusion_proto::bytes::physical_plan_to_bytes_with_proto_converter;
+
+    let field_a = Field::new("a", DataType::Int64, false);
+    let schema = Arc::new(Schema::new(vec![field_a]));
+
+    // Create a simple plan
+    let col_expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
+    let projection_exprs = vec![ProjectionExpr {
+        expr: Arc::clone(&col_expr),
+        alias: "a1".to_string(),
+    }];
+    let exec_plan = Arc::new(ProjectionExec::try_new(
+        projection_exprs.clone(),
+        Arc::new(EmptyExec::new(Arc::clone(&schema))),
+    )?);
+
+    let codec = DefaultPhysicalExtensionCodec {};
+    let proto_converter = DeduplicatingProtoConverter {};
+
+    // First serialization
+    let bytes1 = physical_plan_to_bytes_with_proto_converter(
+        Arc::clone(&exec_plan) as Arc<dyn ExecutionPlan>,
+        &codec,
+        &proto_converter,
+    )?;
+
+    // Second serialization with the same converter
+    let bytes2 = physical_plan_to_bytes_with_proto_converter(
+        Arc::clone(&exec_plan) as Arc<dyn ExecutionPlan>,
+        &codec,
+        &proto_converter,
+    )?;
+
+    // The serialized bytes should be different due to different session_ids
+    // (specifically, the expr_id values embedded in the protobuf will differ)
+    assert_ne!(
+        bytes1.as_ref(),
+        bytes2.as_ref(),
+        "Expected different serialized bytes due to session_id rotation"
+    );
+
+    // But both should deserialize correctly
+    let ctx = SessionContext::new();
+    let deser_converter = DeduplicatingProtoConverter {};
+
+    let plan1 = 
datafusion_proto::bytes::physical_plan_from_bytes_with_proto_converter(
+        bytes1.as_ref(),
+        ctx.task_ctx().as_ref(),
+        &codec,
+        &deser_converter,
+    )?;
+
+    let plan2 = 
datafusion_proto::bytes::physical_plan_from_bytes_with_proto_converter(
+        bytes2.as_ref(),
+        ctx.task_ctx().as_ref(),
+        &codec,
+        &deser_converter,
+    )?;
+
+    // Verify both plans have the expected structure
+    assert_eq!(plan1.schema(), plan2.schema());
+
+    Ok(())
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to