This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-22569-2888bce68bc4ca745ce0c47dc004e7b71f91b0a4 in repository https://gitbox.apache.org/repos/asf/datafusion.git
commit a754587812a43b27792532b4035c43e20544dad5 Author: Guocheng(Eric) Song <[email protected]> AuthorDate: Fri May 29 02:09:12 2026 +1000 Port CastExpr to proto hooks (#22569) ## Which issue does this PR close? - Closes #22428. ## Rationale for this change `CastExpr` protobuf serialization is currently handled by the central physical expression downcast chain. This PR migrates it to the per-expression `try_to_proto` / `try_from_proto` hooks, following the existing `Column` / `LikeExpr` pattern. ## What changes are included in this PR? - Adds `CastExpr::try_to_proto` in the `impl PhysicalExpr for CastExpr` block. - Adds inherent `CastExpr::try_from_proto` for decoding `PhysicalCastNode`. - Routes `ExprType::Cast` decoding through the new hook. - Removes the old central `CastExpr` serialization arm from `to_proto.rs`. - Adds direct hook tests for successful encode/decode and bad-input cases. ## Are these changes tested? Yes ## Are there any user-facing changes? No. The protobuf wire format for `CastExpr` remains unchanged. --- datafusion/physical-expr/src/expressions/cast.rs | 216 +++++++++++++++++++++++ datafusion/proto/src/physical_plan/from_proto.rs | 12 +- datafusion/proto/src/physical_plan/to_proto.rs | 16 +- 3 files changed, 219 insertions(+), 25 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index ad214a89ce..26f06b546a 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -298,6 +298,61 @@ impl PhysicalExpr for CastExpr { write!(f, ")") } + + #[cfg(feature = "proto")] + fn try_to_proto( + &self, + ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, + ) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> { + use datafusion_proto_models::protobuf; + + Ok(Some(protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new( + protobuf::PhysicalCastNode { + expr: Some(Box::new(ctx.encode_child(self.expr())?)), + arrow_type: Some(self.cast_type().try_into()?), + }, + ))), + })) + } +} + +#[cfg(feature = "proto")] +impl CastExpr { + /// Reconstruct a [`CastExpr`] from its protobuf representation. + /// + /// Takes the whole [`PhysicalExprNode`] so the decode signature matches + /// other migrated expressions and can inspect outer-node metadata if + /// needed in the future. + /// + /// [`PhysicalExprNode`]: datafusion_proto_models::protobuf::PhysicalExprNode + pub fn try_from_proto( + node: &datafusion_proto_models::protobuf::PhysicalExprNode, + ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, + ) -> Result<Arc<dyn PhysicalExpr>> { + use datafusion_common::internal_datafusion_err; + use datafusion_common::internal_err; + use datafusion_proto_models::protobuf; + + let cast_expr = match &node.expr_type { + Some(protobuf::physical_expr_node::ExprType::Cast(cast_expr)) => { + cast_expr.as_ref() + } + _ => return internal_err!("PhysicalExprNode is not a CastExpr"), + }; + + let expr = ctx.decode_required_expression( + cast_expr.expr.as_deref(), + "CastExpr", + "expr", + )?; + let arrow_type = cast_expr.arrow_type.as_ref().ok_or_else(|| { + internal_datafusion_err!("CastExpr is missing required field 'arrow_type'") + })?; + + Ok(Arc::new(CastExpr::new(expr, arrow_type.try_into()?, None))) + } } /// Return a PhysicalExpression representing `expr` casted to @@ -1154,3 +1209,164 @@ mod tests { Ok(()) } } + +/// Tests for the `try_to_proto` / `try_from_proto` hooks. +#[cfg(all(test, feature = "proto"))] +mod proto_tests { + use super::*; + use crate::expressions::{Column, col}; + use crate::proto_test_util::{ + StubDecoder, StubEncoder, UnreachableDecoder, column_node, + }; + use arrow::datatypes::Field; + use datafusion_common::DataFusionError; + use datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx; + use datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx; + use datafusion_proto_models::datafusion_common::ArrowType; + use datafusion_proto_models::protobuf::{ + PhysicalCastNode, PhysicalExprNode, physical_expr_node, + }; + + /// A `CastExpr` over an `Int32` column, casting to `Int64`. + fn proto_cast_fixture() -> CastExpr { + let schema = Schema::new(vec![Field::new("a", Int32, false)]); + CastExpr::new(col("a", &schema).unwrap(), Int64, None) + } + + fn proto_int64_arrow_type() -> ArrowType { + (&Int64).try_into().unwrap() + } + + /// Build a `CastExpr` proto node with the given child and target type. + fn proto_cast_node( + expr: Option<Box<PhysicalExprNode>>, + arrow_type: Option<ArrowType>, + ) -> PhysicalExprNode { + PhysicalExprNode { + expr_id: None, + expr_type: Some(physical_expr_node::ExprType::Cast(Box::new( + PhysicalCastNode { expr, arrow_type }, + ))), + } + } + + #[test] + fn try_to_proto_encodes_cast_expr() { + let cast = proto_cast_fixture(); + let encoder = StubEncoder::ok(); + let ctx = PhysicalExprEncodeCtx::new(&encoder); + + let node = cast + .try_to_proto(&ctx) + .unwrap() + .expect("CastExpr should encode to Some(node)"); + + assert!(node.expr_id.is_none()); + let cast_node = match node.expr_type { + Some(physical_expr_node::ExprType::Cast(cast_node)) => *cast_node, + other => panic!("expected a Cast node, got {other:?}"), + }; + assert!(cast_node.expr.is_some()); + + let arrow_type = cast_node + .arrow_type + .as_ref() + .expect("cast type should be encoded"); + let data_type: DataType = arrow_type.try_into().unwrap(); + assert_eq!(data_type, Int64); + } + + #[test] + fn try_to_proto_propagates_child_encode_error() { + let cast = proto_cast_fixture(); + let encoder = StubEncoder::failing_on(1); + let ctx = PhysicalExprEncodeCtx::new(&encoder); + + let err = cast.try_to_proto(&ctx).unwrap_err(); + assert!(matches!( + err, + DataFusionError::Internal(msg) if msg.contains("call 1") + )); + } + + #[test] + fn try_from_proto_decodes_cast_expr() { + let node = proto_cast_node( + Some(Box::new(column_node("a"))), + Some(proto_int64_arrow_type()), + ); + let schema = Schema::empty(); + let decoder = StubDecoder::ok(); + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + + let decoded = CastExpr::try_from_proto(&node, &ctx).unwrap(); + let cast = decoded + .downcast_ref::<CastExpr>() + .expect("decoded expr should be a CastExpr"); + + assert_eq!(cast.cast_type(), &Int64); + assert!(cast.expr().downcast_ref::<Column>().is_some()); + } + + #[test] + fn try_from_proto_rejects_non_cast_node() { + let node = column_node("a"); + let schema = Schema::empty(); + let decoder = UnreachableDecoder; + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + + let err = CastExpr::try_from_proto(&node, &ctx).unwrap_err(); + assert!(matches!( + err, + DataFusionError::Internal(msg) + if msg.contains("PhysicalExprNode is not a CastExpr") + )); + } + + #[test] + fn try_from_proto_rejects_missing_expr() { + let node = proto_cast_node(None, Some(proto_int64_arrow_type())); + let schema = Schema::empty(); + let decoder = UnreachableDecoder; + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + + let err = CastExpr::try_from_proto(&node, &ctx).unwrap_err(); + assert!(matches!( + err, + DataFusionError::Internal(msg) + if msg.contains("CastExpr is missing required field 'expr'") + )); + } + + #[test] + fn try_from_proto_rejects_missing_arrow_type() { + let node = proto_cast_node(Some(Box::new(column_node("a"))), None); + let schema = Schema::empty(); + let decoder = StubDecoder::ok(); + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + + let err = CastExpr::try_from_proto(&node, &ctx).unwrap_err(); + assert!(matches!( + err, + DataFusionError::Internal(msg) + if msg.contains("CastExpr is missing required field 'arrow_type'") + )); + } + + #[test] + fn try_from_proto_propagates_child_decode_error() { + let node = proto_cast_node( + Some(Box::new(column_node("a"))), + Some(proto_int64_arrow_type()), + ); + let schema = Schema::empty(); + let decoder = StubDecoder::failing_on(1); + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + + let err = CastExpr::try_from_proto(&node, &ctx).unwrap_err(); + assert!(matches!( + err, + DataFusionError::Internal(msg) if msg.contains("call 1") + )); + } +} diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index a3839ad213..41f470aeb6 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -360,17 +360,7 @@ pub fn parse_physical_expr_with_converter( }) .transpose()?, )?), - ExprType::Cast(e) => Arc::new(CastExpr::new( - parse_required_physical_expr( - e.expr.as_deref(), - ctx, - "expr", - input_schema, - proto_converter, - )?, - convert_required!(e.arrow_type)?, - None, - )), + ExprType::Cast(_) => CastExpr::try_from_proto(proto, &decode_ctx)?, ExprType::TryCast(e) => Arc::new(TryCastExpr::new( parse_required_physical_expr( e.expr.as_deref(), diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 9926e733e8..5c45c27502 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -36,8 +36,8 @@ use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ - CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal, - NotExpr, TryCastExpr, UnKnownColumn, + CaseExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal, NotExpr, + TryCastExpr, UnKnownColumn, }; use datafusion_physical_plan::udaf::AggregateFunctionExpr; use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr}; @@ -395,18 +395,6 @@ pub fn serialize_physical_expr_with_converter( lit.value().try_into()?, )), }) - } else if let Some(cast) = expr.downcast_ref::<CastExpr>() { - Ok(protobuf::PhysicalExprNode { - expr_id, - expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new( - protobuf::PhysicalCastNode { - expr: Some(Box::new( - proto_converter.physical_expr_to_proto(cast.expr(), codec)?, - )), - arrow_type: Some(cast.cast_type().try_into()?), - }, - ))), - }) } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() { Ok(protobuf::PhysicalExprNode { expr_id, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
