This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a754587812 Port CastExpr to proto hooks (#22569)
a754587812 is described below
commit a754587812a43b27792532b4035c43e20544dad5
Author: Guocheng(Eric) Song <[email protected]>
AuthorDate: Fri May 29 02:09:12 2026 +1000
Port CastExpr to proto hooks (#22569)
## Which issue does this PR close?
- Closes #22428.
## Rationale for this change
`CastExpr` protobuf serialization is currently handled by the central
physical expression downcast chain. This PR migrates it to the
per-expression `try_to_proto` / `try_from_proto` hooks, following the
existing `Column` / `LikeExpr` pattern.
## What changes are included in this PR?
- Adds `CastExpr::try_to_proto` in the `impl PhysicalExpr for CastExpr`
block.
- Adds inherent `CastExpr::try_from_proto` for decoding
`PhysicalCastNode`.
- Routes `ExprType::Cast` decoding through the new hook.
- Removes the old central `CastExpr` serialization arm from
`to_proto.rs`.
- Adds direct hook tests for successful encode/decode and bad-input
cases.
## Are these changes tested?
Yes
## Are there any user-facing changes?
No. The protobuf wire format for `CastExpr` remains unchanged.
---
datafusion/physical-expr/src/expressions/cast.rs | 216 +++++++++++++++++++++++
datafusion/proto/src/physical_plan/from_proto.rs | 12 +-
datafusion/proto/src/physical_plan/to_proto.rs | 16 +-
3 files changed, 219 insertions(+), 25 deletions(-)
diff --git a/datafusion/physical-expr/src/expressions/cast.rs
b/datafusion/physical-expr/src/expressions/cast.rs
index ad214a89ce..26f06b546a 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -298,6 +298,61 @@ impl PhysicalExpr for CastExpr {
write!(f, ")")
}
+
+ #[cfg(feature = "proto")]
+ fn try_to_proto(
+ &self,
+ ctx:
&datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
+ ) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
+ use datafusion_proto_models::protobuf;
+
+ Ok(Some(protobuf::PhysicalExprNode {
+ expr_id: None,
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
+ protobuf::PhysicalCastNode {
+ expr: Some(Box::new(ctx.encode_child(self.expr())?)),
+ arrow_type: Some(self.cast_type().try_into()?),
+ },
+ ))),
+ }))
+ }
+}
+
+#[cfg(feature = "proto")]
+impl CastExpr {
+ /// Reconstruct a [`CastExpr`] from its protobuf representation.
+ ///
+ /// Takes the whole [`PhysicalExprNode`] so the decode signature matches
+ /// other migrated expressions and can inspect outer-node metadata if
+ /// needed in the future.
+ ///
+ /// [`PhysicalExprNode`]:
datafusion_proto_models::protobuf::PhysicalExprNode
+ pub fn try_from_proto(
+ node: &datafusion_proto_models::protobuf::PhysicalExprNode,
+ ctx:
&datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
+ ) -> Result<Arc<dyn PhysicalExpr>> {
+ use datafusion_common::internal_datafusion_err;
+ use datafusion_common::internal_err;
+ use datafusion_proto_models::protobuf;
+
+ let cast_expr = match &node.expr_type {
+ Some(protobuf::physical_expr_node::ExprType::Cast(cast_expr)) => {
+ cast_expr.as_ref()
+ }
+ _ => return internal_err!("PhysicalExprNode is not a CastExpr"),
+ };
+
+ let expr = ctx.decode_required_expression(
+ cast_expr.expr.as_deref(),
+ "CastExpr",
+ "expr",
+ )?;
+ let arrow_type = cast_expr.arrow_type.as_ref().ok_or_else(|| {
+ internal_datafusion_err!("CastExpr is missing required field
'arrow_type'")
+ })?;
+
+ Ok(Arc::new(CastExpr::new(expr, arrow_type.try_into()?, None)))
+ }
}
/// Return a PhysicalExpression representing `expr` casted to
@@ -1154,3 +1209,164 @@ mod tests {
Ok(())
}
}
+
+/// Tests for the `try_to_proto` / `try_from_proto` hooks.
+#[cfg(all(test, feature = "proto"))]
+mod proto_tests {
+ use super::*;
+ use crate::expressions::{Column, col};
+ use crate::proto_test_util::{
+ StubDecoder, StubEncoder, UnreachableDecoder, column_node,
+ };
+ use arrow::datatypes::Field;
+ use datafusion_common::DataFusionError;
+ use
datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx;
+ use
datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx;
+ use datafusion_proto_models::datafusion_common::ArrowType;
+ use datafusion_proto_models::protobuf::{
+ PhysicalCastNode, PhysicalExprNode, physical_expr_node,
+ };
+
+ /// A `CastExpr` over an `Int32` column, casting to `Int64`.
+ fn proto_cast_fixture() -> CastExpr {
+ let schema = Schema::new(vec![Field::new("a", Int32, false)]);
+ CastExpr::new(col("a", &schema).unwrap(), Int64, None)
+ }
+
+ fn proto_int64_arrow_type() -> ArrowType {
+ (&Int64).try_into().unwrap()
+ }
+
+ /// Build a `CastExpr` proto node with the given child and target type.
+ fn proto_cast_node(
+ expr: Option<Box<PhysicalExprNode>>,
+ arrow_type: Option<ArrowType>,
+ ) -> PhysicalExprNode {
+ PhysicalExprNode {
+ expr_id: None,
+ expr_type: Some(physical_expr_node::ExprType::Cast(Box::new(
+ PhysicalCastNode { expr, arrow_type },
+ ))),
+ }
+ }
+
+ #[test]
+ fn try_to_proto_encodes_cast_expr() {
+ let cast = proto_cast_fixture();
+ let encoder = StubEncoder::ok();
+ let ctx = PhysicalExprEncodeCtx::new(&encoder);
+
+ let node = cast
+ .try_to_proto(&ctx)
+ .unwrap()
+ .expect("CastExpr should encode to Some(node)");
+
+ assert!(node.expr_id.is_none());
+ let cast_node = match node.expr_type {
+ Some(physical_expr_node::ExprType::Cast(cast_node)) => *cast_node,
+ other => panic!("expected a Cast node, got {other:?}"),
+ };
+ assert!(cast_node.expr.is_some());
+
+ let arrow_type = cast_node
+ .arrow_type
+ .as_ref()
+ .expect("cast type should be encoded");
+ let data_type: DataType = arrow_type.try_into().unwrap();
+ assert_eq!(data_type, Int64);
+ }
+
+ #[test]
+ fn try_to_proto_propagates_child_encode_error() {
+ let cast = proto_cast_fixture();
+ let encoder = StubEncoder::failing_on(1);
+ let ctx = PhysicalExprEncodeCtx::new(&encoder);
+
+ let err = cast.try_to_proto(&ctx).unwrap_err();
+ assert!(matches!(
+ err,
+ DataFusionError::Internal(msg) if msg.contains("call 1")
+ ));
+ }
+
+ #[test]
+ fn try_from_proto_decodes_cast_expr() {
+ let node = proto_cast_node(
+ Some(Box::new(column_node("a"))),
+ Some(proto_int64_arrow_type()),
+ );
+ let schema = Schema::empty();
+ let decoder = StubDecoder::ok();
+ let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
+
+ let decoded = CastExpr::try_from_proto(&node, &ctx).unwrap();
+ let cast = decoded
+ .downcast_ref::<CastExpr>()
+ .expect("decoded expr should be a CastExpr");
+
+ assert_eq!(cast.cast_type(), &Int64);
+ assert!(cast.expr().downcast_ref::<Column>().is_some());
+ }
+
+ #[test]
+ fn try_from_proto_rejects_non_cast_node() {
+ let node = column_node("a");
+ let schema = Schema::empty();
+ let decoder = UnreachableDecoder;
+ let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
+
+ let err = CastExpr::try_from_proto(&node, &ctx).unwrap_err();
+ assert!(matches!(
+ err,
+ DataFusionError::Internal(msg)
+ if msg.contains("PhysicalExprNode is not a CastExpr")
+ ));
+ }
+
+ #[test]
+ fn try_from_proto_rejects_missing_expr() {
+ let node = proto_cast_node(None, Some(proto_int64_arrow_type()));
+ let schema = Schema::empty();
+ let decoder = UnreachableDecoder;
+ let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
+
+ let err = CastExpr::try_from_proto(&node, &ctx).unwrap_err();
+ assert!(matches!(
+ err,
+ DataFusionError::Internal(msg)
+ if msg.contains("CastExpr is missing required field 'expr'")
+ ));
+ }
+
+ #[test]
+ fn try_from_proto_rejects_missing_arrow_type() {
+ let node = proto_cast_node(Some(Box::new(column_node("a"))), None);
+ let schema = Schema::empty();
+ let decoder = StubDecoder::ok();
+ let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
+
+ let err = CastExpr::try_from_proto(&node, &ctx).unwrap_err();
+ assert!(matches!(
+ err,
+ DataFusionError::Internal(msg)
+ if msg.contains("CastExpr is missing required field
'arrow_type'")
+ ));
+ }
+
+ #[test]
+ fn try_from_proto_propagates_child_decode_error() {
+ let node = proto_cast_node(
+ Some(Box::new(column_node("a"))),
+ Some(proto_int64_arrow_type()),
+ );
+ let schema = Schema::empty();
+ let decoder = StubDecoder::failing_on(1);
+ let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
+
+ let err = CastExpr::try_from_proto(&node, &ctx).unwrap_err();
+ assert!(matches!(
+ err,
+ DataFusionError::Internal(msg) if msg.contains("call 1")
+ ));
+ }
+}
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index a3839ad213..41f470aeb6 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -360,17 +360,7 @@ pub fn parse_physical_expr_with_converter(
})
.transpose()?,
)?),
- ExprType::Cast(e) => Arc::new(CastExpr::new(
- parse_required_physical_expr(
- e.expr.as_deref(),
- ctx,
- "expr",
- input_schema,
- proto_converter,
- )?,
- convert_required!(e.arrow_type)?,
- None,
- )),
+ ExprType::Cast(_) => CastExpr::try_from_proto(proto, &decode_ctx)?,
ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
parse_required_physical_expr(
e.expr.as_deref(),
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index 9926e733e8..5c45c27502 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -36,8 +36,8 @@ use
datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr;
use datafusion_physical_expr::window::{SlidingAggregateWindowExpr,
StandardWindowExpr};
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use datafusion_physical_plan::expressions::{
- CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr,
Literal,
- NotExpr, TryCastExpr, UnKnownColumn,
+ CaseExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal,
NotExpr,
+ TryCastExpr, UnKnownColumn,
};
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
use datafusion_physical_plan::windows::{PlainAggregateWindowExpr,
WindowUDFExpr};
@@ -395,18 +395,6 @@ pub fn serialize_physical_expr_with_converter(
lit.value().try_into()?,
)),
})
- } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_id,
- expr_type:
Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
- protobuf::PhysicalCastNode {
- expr: Some(Box::new(
- proto_converter.physical_expr_to_proto(cast.expr(),
codec)?,
- )),
- arrow_type: Some(cast.cast_type().try_into()?),
- },
- ))),
- })
} else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
Ok(protobuf::PhysicalExprNode {
expr_id,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]