This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a754587812 Port CastExpr to proto hooks (#22569)
a754587812 is described below

commit a754587812a43b27792532b4035c43e20544dad5
Author: Guocheng(Eric) Song <[email protected]>
AuthorDate: Fri May 29 02:09:12 2026 +1000

    Port CastExpr to proto hooks (#22569)
    
    ## Which issue does this PR close?
    
    - Closes #22428.
    
    ## Rationale for this change
    
    `CastExpr` protobuf serialization is currently handled by the central
    physical expression downcast chain. This PR migrates it to the
    per-expression `try_to_proto` / `try_from_proto` hooks, following the
    existing `Column` / `LikeExpr` pattern.
    
    ## What changes are included in this PR?
    
    - Adds `CastExpr::try_to_proto` in the `impl PhysicalExpr for CastExpr`
    block.
    - Adds inherent `CastExpr::try_from_proto` for decoding
    `PhysicalCastNode`.
    - Routes `ExprType::Cast` decoding through the new hook.
    - Removes the old central `CastExpr` serialization arm from
    `to_proto.rs`.
    - Adds direct hook tests for successful encode/decode and bad-input
    cases.
    
    ## Are these changes tested?
    
    Yes
    
    ## Are there any user-facing changes?
    
    No. The protobuf wire format for `CastExpr` remains unchanged.
---
 datafusion/physical-expr/src/expressions/cast.rs | 216 +++++++++++++++++++++++
 datafusion/proto/src/physical_plan/from_proto.rs |  12 +-
 datafusion/proto/src/physical_plan/to_proto.rs   |  16 +-
 3 files changed, 219 insertions(+), 25 deletions(-)

diff --git a/datafusion/physical-expr/src/expressions/cast.rs 
b/datafusion/physical-expr/src/expressions/cast.rs
index ad214a89ce..26f06b546a 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -298,6 +298,61 @@ impl PhysicalExpr for CastExpr {
 
         write!(f, ")")
     }
+
+    #[cfg(feature = "proto")]
+    fn try_to_proto(
+        &self,
+        ctx: 
&datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
+    ) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
+        use datafusion_proto_models::protobuf;
+
+        Ok(Some(protobuf::PhysicalExprNode {
+            expr_id: None,
+            expr_type: 
Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
+                protobuf::PhysicalCastNode {
+                    expr: Some(Box::new(ctx.encode_child(self.expr())?)),
+                    arrow_type: Some(self.cast_type().try_into()?),
+                },
+            ))),
+        }))
+    }
+}
+
+#[cfg(feature = "proto")]
+impl CastExpr {
+    /// Reconstruct a [`CastExpr`] from its protobuf representation.
+    ///
+    /// Takes the whole [`PhysicalExprNode`] so the decode signature matches
+    /// other migrated expressions and can inspect outer-node metadata if
+    /// needed in the future.
+    ///
+    /// [`PhysicalExprNode`]: 
datafusion_proto_models::protobuf::PhysicalExprNode
+    pub fn try_from_proto(
+        node: &datafusion_proto_models::protobuf::PhysicalExprNode,
+        ctx: 
&datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        use datafusion_common::internal_datafusion_err;
+        use datafusion_common::internal_err;
+        use datafusion_proto_models::protobuf;
+
+        let cast_expr = match &node.expr_type {
+            Some(protobuf::physical_expr_node::ExprType::Cast(cast_expr)) => {
+                cast_expr.as_ref()
+            }
+            _ => return internal_err!("PhysicalExprNode is not a CastExpr"),
+        };
+
+        let expr = ctx.decode_required_expression(
+            cast_expr.expr.as_deref(),
+            "CastExpr",
+            "expr",
+        )?;
+        let arrow_type = cast_expr.arrow_type.as_ref().ok_or_else(|| {
+            internal_datafusion_err!("CastExpr is missing required field 
'arrow_type'")
+        })?;
+
+        Ok(Arc::new(CastExpr::new(expr, arrow_type.try_into()?, None)))
+    }
 }
 
 /// Return a PhysicalExpression representing `expr` casted to
@@ -1154,3 +1209,164 @@ mod tests {
         Ok(())
     }
 }
+
+/// Tests for the `try_to_proto` / `try_from_proto` hooks.
+#[cfg(all(test, feature = "proto"))]
+mod proto_tests {
+    use super::*;
+    use crate::expressions::{Column, col};
+    use crate::proto_test_util::{
+        StubDecoder, StubEncoder, UnreachableDecoder, column_node,
+    };
+    use arrow::datatypes::Field;
+    use datafusion_common::DataFusionError;
+    use 
datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx;
+    use 
datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx;
+    use datafusion_proto_models::datafusion_common::ArrowType;
+    use datafusion_proto_models::protobuf::{
+        PhysicalCastNode, PhysicalExprNode, physical_expr_node,
+    };
+
+    /// A `CastExpr` over an `Int32` column, casting to `Int64`.
+    fn proto_cast_fixture() -> CastExpr {
+        let schema = Schema::new(vec![Field::new("a", Int32, false)]);
+        CastExpr::new(col("a", &schema).unwrap(), Int64, None)
+    }
+
+    fn proto_int64_arrow_type() -> ArrowType {
+        (&Int64).try_into().unwrap()
+    }
+
+    /// Build a `CastExpr` proto node with the given child and target type.
+    fn proto_cast_node(
+        expr: Option<Box<PhysicalExprNode>>,
+        arrow_type: Option<ArrowType>,
+    ) -> PhysicalExprNode {
+        PhysicalExprNode {
+            expr_id: None,
+            expr_type: Some(physical_expr_node::ExprType::Cast(Box::new(
+                PhysicalCastNode { expr, arrow_type },
+            ))),
+        }
+    }
+
+    #[test]
+    fn try_to_proto_encodes_cast_expr() {
+        let cast = proto_cast_fixture();
+        let encoder = StubEncoder::ok();
+        let ctx = PhysicalExprEncodeCtx::new(&encoder);
+
+        let node = cast
+            .try_to_proto(&ctx)
+            .unwrap()
+            .expect("CastExpr should encode to Some(node)");
+
+        assert!(node.expr_id.is_none());
+        let cast_node = match node.expr_type {
+            Some(physical_expr_node::ExprType::Cast(cast_node)) => *cast_node,
+            other => panic!("expected a Cast node, got {other:?}"),
+        };
+        assert!(cast_node.expr.is_some());
+
+        let arrow_type = cast_node
+            .arrow_type
+            .as_ref()
+            .expect("cast type should be encoded");
+        let data_type: DataType = arrow_type.try_into().unwrap();
+        assert_eq!(data_type, Int64);
+    }
+
+    #[test]
+    fn try_to_proto_propagates_child_encode_error() {
+        let cast = proto_cast_fixture();
+        let encoder = StubEncoder::failing_on(1);
+        let ctx = PhysicalExprEncodeCtx::new(&encoder);
+
+        let err = cast.try_to_proto(&ctx).unwrap_err();
+        assert!(matches!(
+            err,
+            DataFusionError::Internal(msg) if msg.contains("call 1")
+        ));
+    }
+
+    #[test]
+    fn try_from_proto_decodes_cast_expr() {
+        let node = proto_cast_node(
+            Some(Box::new(column_node("a"))),
+            Some(proto_int64_arrow_type()),
+        );
+        let schema = Schema::empty();
+        let decoder = StubDecoder::ok();
+        let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
+
+        let decoded = CastExpr::try_from_proto(&node, &ctx).unwrap();
+        let cast = decoded
+            .downcast_ref::<CastExpr>()
+            .expect("decoded expr should be a CastExpr");
+
+        assert_eq!(cast.cast_type(), &Int64);
+        assert!(cast.expr().downcast_ref::<Column>().is_some());
+    }
+
+    #[test]
+    fn try_from_proto_rejects_non_cast_node() {
+        let node = column_node("a");
+        let schema = Schema::empty();
+        let decoder = UnreachableDecoder;
+        let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
+
+        let err = CastExpr::try_from_proto(&node, &ctx).unwrap_err();
+        assert!(matches!(
+            err,
+            DataFusionError::Internal(msg)
+                if msg.contains("PhysicalExprNode is not a CastExpr")
+        ));
+    }
+
+    #[test]
+    fn try_from_proto_rejects_missing_expr() {
+        let node = proto_cast_node(None, Some(proto_int64_arrow_type()));
+        let schema = Schema::empty();
+        let decoder = UnreachableDecoder;
+        let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
+
+        let err = CastExpr::try_from_proto(&node, &ctx).unwrap_err();
+        assert!(matches!(
+            err,
+            DataFusionError::Internal(msg)
+                if msg.contains("CastExpr is missing required field 'expr'")
+        ));
+    }
+
+    #[test]
+    fn try_from_proto_rejects_missing_arrow_type() {
+        let node = proto_cast_node(Some(Box::new(column_node("a"))), None);
+        let schema = Schema::empty();
+        let decoder = StubDecoder::ok();
+        let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
+
+        let err = CastExpr::try_from_proto(&node, &ctx).unwrap_err();
+        assert!(matches!(
+            err,
+            DataFusionError::Internal(msg)
+                if msg.contains("CastExpr is missing required field 
'arrow_type'")
+        ));
+    }
+
+    #[test]
+    fn try_from_proto_propagates_child_decode_error() {
+        let node = proto_cast_node(
+            Some(Box::new(column_node("a"))),
+            Some(proto_int64_arrow_type()),
+        );
+        let schema = Schema::empty();
+        let decoder = StubDecoder::failing_on(1);
+        let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
+
+        let err = CastExpr::try_from_proto(&node, &ctx).unwrap_err();
+        assert!(matches!(
+            err,
+            DataFusionError::Internal(msg) if msg.contains("call 1")
+        ));
+    }
+}
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index a3839ad213..41f470aeb6 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -360,17 +360,7 @@ pub fn parse_physical_expr_with_converter(
                 })
                 .transpose()?,
         )?),
-        ExprType::Cast(e) => Arc::new(CastExpr::new(
-            parse_required_physical_expr(
-                e.expr.as_deref(),
-                ctx,
-                "expr",
-                input_schema,
-                proto_converter,
-            )?,
-            convert_required!(e.arrow_type)?,
-            None,
-        )),
+        ExprType::Cast(_) => CastExpr::try_from_proto(proto, &decode_ctx)?,
         ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
             parse_required_physical_expr(
                 e.expr.as_deref(),
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index 9926e733e8..5c45c27502 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -36,8 +36,8 @@ use 
datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr;
 use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, 
StandardWindowExpr};
 use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
 use datafusion_physical_plan::expressions::{
-    CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, 
Literal,
-    NotExpr, TryCastExpr, UnKnownColumn,
+    CaseExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal, 
NotExpr,
+    TryCastExpr, UnKnownColumn,
 };
 use datafusion_physical_plan::udaf::AggregateFunctionExpr;
 use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, 
WindowUDFExpr};
@@ -395,18 +395,6 @@ pub fn serialize_physical_expr_with_converter(
                 lit.value().try_into()?,
             )),
         })
-    } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
-        Ok(protobuf::PhysicalExprNode {
-            expr_id,
-            expr_type: 
Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
-                protobuf::PhysicalCastNode {
-                    expr: Some(Box::new(
-                        proto_converter.physical_expr_to_proto(cast.expr(), 
codec)?,
-                    )),
-                    arrow_type: Some(cast.cast_type().try_into()?),
-                },
-            ))),
-        })
     } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
         Ok(protobuf::PhysicalExprNode {
             expr_id,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to