This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-22636-c1f0d54bb9e1054e7c7212037e970b3d3b34885c in repository https://gitbox.apache.org/repos/asf/datafusion.git
commit d1ec74e0d16aacede8091224ca2f8463f0671842 Author: Kanishk Sachan <[email protected]> AuthorDate: Fri Jun 5 02:38:50 2026 +0100 feat(physical-expr): port Literal to try_to_proto / try_from_proto hooks (#22636) ## Which issue does this PR close? Closes #22427 ## Rationale for this change `Literal` serialization/deserialization lived in the central downcast chains in `to_proto.rs` and `from_proto.rs`. This PR moves it into self-contained `try_to_proto` / `try_from_proto` hooks on `Literal` itself, following the pattern established for `NotExpr`, `NegativeExpr`, `IsNullExpr`, and `IsNotNullExpr`. ## What changes are included in this PR? - `datafusion/physical-expr/src/expressions/literal.rs` - Added `#[cfg(feature = "proto")] fn try_to_proto(...)` inside `impl PhysicalExpr for Literal` - Added `#[cfg(feature = "proto")] impl Literal { pub fn try_from_proto(...) }` - Added `proto_tests` module with encode, null-literal, roundtrip, and reject-wrong-variant tests - `datafusion/proto/src/physical_plan/to_proto.rs` - Removed the `Literal` downcast arm; removed `Literal` from import list - `datafusion/proto/src/physical_plan/from_proto.rs` - Replaced the inline `ExprType::Literal` arm with `Literal::try_from_proto(proto, &decode_ctx)?` ## Are there any user-facing changes? No. Serialization behaviour is identical; only the code location changed. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Kanishk Sachan <koopatroopa787> --- .../physical-expr/src/expressions/literal.rs | 135 +++++++++++++++++++++ datafusion/proto/src/physical_plan/from_proto.rs | 2 +- datafusion/proto/src/physical_plan/to_proto.rs | 11 +- 3 files changed, 137 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 7351158c54..5fb9a3b2cd 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -133,6 +133,41 @@ impl PhysicalExpr for Literal { fn placement(&self) -> ExpressionPlacement { ExpressionPlacement::Literal } + + #[cfg(feature = "proto")] + fn try_to_proto( + &self, + _ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, + ) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> { + use datafusion_proto_models::protobuf; + + Ok(Some(protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::Literal( + (&self.value).try_into()?, + )), + })) + } +} + +#[cfg(feature = "proto")] +impl Literal { + /// Reconstruct a [`Literal`] from its protobuf representation. + pub fn try_from_proto( + node: &datafusion_proto_models::protobuf::PhysicalExprNode, + _ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, + ) -> Result<Arc<dyn PhysicalExpr>> { + use datafusion_physical_expr_common::expect_expr_variant; + use datafusion_proto_models::protobuf; + + let scalar_proto = expect_expr_variant!( + node, + protobuf::physical_expr_node::ExprType::Literal, + "Literal", + ); + let value = ScalarValue::try_from(scalar_proto)?; + Ok(Arc::new(Literal::new(value))) + } } /// Create a literal expression @@ -190,3 +225,103 @@ mod tests { Ok(()) } } + +/// Tests for the `try_to_proto` / `try_from_proto` hooks. +#[cfg(all(test, feature = "proto"))] +mod proto_tests { + use super::*; + use crate::proto_test_util::{StubEncoder, UnreachableDecoder, column_node}; + use datafusion_common::DataFusionError; + use datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx; + use datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx; + use datafusion_proto_models::protobuf::physical_expr_node; + + fn i32_literal() -> Literal { + Literal::new(ScalarValue::Int32(Some(42))) + } + + // ── try_to_proto ───────────────────────────────────────────────────────── + + #[test] + fn try_to_proto_encodes_literal() { + let literal = i32_literal(); + let encoder = StubEncoder::ok(); + let ctx = PhysicalExprEncodeCtx::new(&encoder); + + let node = literal + .try_to_proto(&ctx) + .unwrap() + .expect("Literal should encode to Some(node)"); + + // Literal nodes never set expr_id. + assert!(node.expr_id.is_none()); + // Variant must be Literal, not any other expr type. + assert!(matches!( + node.expr_type, + Some(physical_expr_node::ExprType::Literal(_)) + )); + } + + #[test] + fn try_to_proto_null_literal() { + let literal = Literal::new(ScalarValue::Int32(None)); + let encoder = StubEncoder::ok(); + let ctx = PhysicalExprEncodeCtx::new(&encoder); + + let node = literal + .try_to_proto(&ctx) + .unwrap() + .expect("null Literal should encode to Some(node)"); + + assert!(matches!( + node.expr_type, + Some(physical_expr_node::ExprType::Literal(_)) + )); + + // Decode and verify the null payload round-trips correctly. + let schema = Schema::empty(); + let decoder = UnreachableDecoder; + let dec_ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + let decoded = Literal::try_from_proto(&node, &dec_ctx).unwrap(); + let lit = decoded + .downcast_ref::<Literal>() + .expect("decoded expr should be a Literal"); + assert_eq!(lit.value(), &ScalarValue::Int32(None)); + } + + // ── try_from_proto ─────────────────────────────────────────────────────── + + #[test] + fn try_from_proto_roundtrip() { + let original = i32_literal(); + let encoder = StubEncoder::ok(); + let enc_ctx = PhysicalExprEncodeCtx::new(&encoder); + + let node = original + .try_to_proto(&enc_ctx) + .unwrap() + .expect("should encode"); + + let schema = Schema::empty(); + let decoder = UnreachableDecoder; + let dec_ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + + let decoded = Literal::try_from_proto(&node, &dec_ctx).unwrap(); + let lit = decoded + .downcast_ref::<Literal>() + .expect("decoded expr should be a Literal"); + assert_eq!(lit.value(), &ScalarValue::Int32(Some(42))); + } + + #[test] + fn try_from_proto_rejects_non_literal_node() { + let node = column_node("a"); + let schema = Schema::empty(); + let decoder = UnreachableDecoder; + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + let err = Literal::try_from_proto(&node, &ctx).unwrap_err(); + assert!( + matches!(err, DataFusionError::Internal(ref msg) if msg.contains("PhysicalExprNode is not a Literal")) + ); + } +} diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 21d700de89..c886633999 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -283,7 +283,7 @@ pub fn parse_physical_expr_with_converter( // to the right constructor. ExprType::Column(_) => Column::try_from_proto(proto, &decode_ctx)?, ExprType::UnknownColumn(_) => UnKnownColumn::try_from_proto(proto, &decode_ctx)?, - ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)), + ExprType::Literal(_) => Literal::try_from_proto(proto, &decode_ctx)?, ExprType::BinaryExpr(_) => BinaryExpr::try_from_proto(proto, &decode_ctx)?, ExprType::AggregateExpr(_) => { return not_impl_err!( diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 096ed46935..c45d432f9a 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -35,9 +35,7 @@ use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; -use datafusion_physical_plan::expressions::{ - CaseExpr, DynamicFilterPhysicalExpr, Literal, -}; +use datafusion_physical_plan::expressions::{CaseExpr, DynamicFilterPhysicalExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr}; use datafusion_physical_plan::{ @@ -345,13 +343,6 @@ pub fn serialize_physical_expr_with_converter( ), ), }) - } else if let Some(lit) = expr.downcast_ref::<Literal>() { - Ok(protobuf::PhysicalExprNode { - expr_id, - expr_type: Some(protobuf::physical_expr_node::ExprType::Literal( - lit.value().try_into()?, - )), - }) } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() { let mut buf = Vec::new(); codec.try_encode_udf(expr.fun(), &mut buf)?; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
