This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-22596-69786d8420bf1d3c81c2074602286915ae3a829e in repository https://gitbox.apache.org/repos/asf/datafusion.git
commit c7f35d6f0595662d684a1c7eb64d8bb5599aeeee Author: Adrian Garcia Badaracco <[email protected]> AuthorDate: Thu May 28 12:48:28 2026 -0500 refactor(physical-expr-common): add proto helpers for the recurring shapes in #22418, port already-migrated exprs (#22596) ## Which issue does this PR close? Part of #22418 / follow-up to #22513. ## Rationale for this change Every PR migrating a `PhysicalExpr` to `try_to_proto` / `try_from_proto` under #22418 re-implements the same two shapes that don't fit the existing helpers from #22513: 1. The outer `match &node.expr_type { ... }` that opens every `try_from_proto`: ```rust let try_cast = match &node.expr_type { Some(protobuf::physical_expr_node::ExprType::TryCast(x)) => x.as_ref(), _ => return internal_err!("PhysicalExprNode is not a TryCastExpr"), }; ``` 2. The hand-rolled "missing required field 'X'" error for non-expression fields like `arrow_type` on `CastExpr` / `TryCastExpr`. Each shape leaks across the 7+ remaining open migration PRs. Adding small helpers in `physical-expr-common` keeps the per-expression diff minimal and the error messages consistent. ## What changes are included in this PR? **Commit 1 — `feat(physical-expr-common): add proto helpers ...`** Two new helpers in `datafusion-physical-expr-common`, both gated on `feature = "proto"`: - `expect_expr_variant!` macro (re-exported at crate root) — matches `Option<ExprType>`, returns inner payload, errors with `"PhysicalExprNode is not a {variant}"`. - `proto_decode::require_proto_field<T>(opt, expr_name, field)` — mirrors `decode_required_expression` for non-`PhysicalExprNode` fields. Five unit tests cover the helpers (success + the two reject paths for the macro). **Commit 2 — `refactor(physical-expr): adopt new proto helpers in already-migrated expressions`** Ports every expression already on the new hooks: - `Column`, `BinaryExpr` (originally #21929) - `LikeExpr` (#22471) - `InListExpr` (#22503) - `NegativeExpr` (#22483) `BinaryExpr` additionally adopts `decode_required_expression` for its legacy `l`/`r` arms and `encode_children_expressions` / `decode_children_expressions` for the linearized `operands` path, removing two more hand-rolled "missing required field" strings. One existing test changes assertion text — `InListExpr`'s rejected-variant message was the only one using the article "an" instead of "a"; the macro emits article-free "a {Variant}" uniformly. The two commits are stacked for review: commit 1 is the helper addition only; commit 2 is the adoption. Either can be reviewed in isolation. ## Are these changes tested? Yes: - `cargo test -p datafusion-physical-expr-common --features proto` — new helper unit tests pass. - `cargo test -p datafusion-physical-expr --features proto proto_tests` — 23 / 23 per-expression proto tests pass (1 assertion-string update in InList). - `cargo test -p datafusion-proto --test proto_integration` — 173 / 173 pass; no wire-format change. - `cargo clippy --all-targets --all-features -- -D warnings` clean on the touched crates. ## Are there any user-facing changes? No. New API surface in `datafusion-physical-expr-common` (helpers gated on `feature = "proto"`); no change to serialized output. The macro `expect_expr_variant!` is exported at the crate root. --- .../physical-expr-common/src/physical_expr.rs | 149 +++++++++++++++++++++ datafusion/physical-expr/src/expressions/binary.rs | 46 +++---- datafusion/physical-expr/src/expressions/column.rs | 10 +- .../physical-expr/src/expressions/in_list.rs | 16 +-- datafusion/physical-expr/src/expressions/like.rs | 13 +- .../physical-expr/src/expressions/negative.rs | 14 +- 6 files changed, 191 insertions(+), 57 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 526bc97e9e..679a44e85e 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -619,6 +619,48 @@ pub mod proto_decode { use super::PhysicalExpr; + /// Open the outer [`PhysicalExprNode`] and assert it carries the expected + /// `ExprType` variant, returning the inner payload (auto-derefs through + /// `Box`) or bailing with an `Internal` error. + /// + /// Every `try_from_proto` starts with the same six-line `match`: + /// + /// ```ignore + /// let try_cast = match &node.expr_type { + /// Some(protobuf::physical_expr_node::ExprType::TryCast(x)) => x.as_ref(), + /// _ => return internal_err!("PhysicalExprNode is not a TryCastExpr"), + /// }; + /// ``` + /// + /// With this macro that collapses to: + /// + /// ```ignore + /// let try_cast = expect_expr_variant!( + /// node, + /// protobuf::physical_expr_node::ExprType::TryCast, + /// "TryCastExpr", + /// ); + /// ``` + /// + /// Pass the variant as a `::` path so the macro stays agnostic to how + /// the caller imports the proto types. + #[macro_export] + macro_rules! expect_expr_variant { + ($node:expr, $variant:path, $expr_name:literal $(,)?) => {{ + match &$node.expr_type { + ::core::option::Option::Some($variant(inner)) => inner, + _ => { + return ::datafusion_common::internal_err!(concat!( + "PhysicalExprNode is not a ", + $expr_name + )); + } + } + }}; + } + #[doc(inline)] + pub use expect_expr_variant; + /// Decoder context handed to per-expression `try_from_proto` constructors. /// /// Wraps an internal [`PhysicalExprDecode`] trait object plus a borrowed @@ -693,6 +735,33 @@ pub mod proto_decode { } } + /// Unwrap a required non-expression proto field. + /// + /// Mirrors [`PhysicalExprDecodeCtx::decode_required_expression`] for proto + /// fields that aren't [`PhysicalExprNode`]s — e.g. the `arrow_type` of a + /// `PhysicalCastNode` or the `scalar` of a `PhysicalLiteralNode`. Keeps + /// the "missing required field" message format identical across + /// expressions: + /// + /// ```ignore + /// let arrow_type = require_proto_field( + /// cast_expr.arrow_type.as_ref(), + /// "CastExpr", + /// "arrow_type", + /// )?; + /// ``` + pub fn require_proto_field<T>( + opt: Option<T>, + expr_name: &str, + field: &str, + ) -> Result<T> { + opt.ok_or_else(|| { + datafusion_common::internal_datafusion_err!( + "{expr_name} is missing required field '{field}'" + ) + }) + } + /// Internal dispatch trait. Implementors live in `datafusion-proto`. /// Expression authors should use [`PhysicalExprDecodeCtx`] instead of /// calling this directly. @@ -1143,3 +1212,83 @@ mod test { ); } } + +#[cfg(all(test, feature = "proto"))] +mod proto_helper_tests { + use datafusion_common::DataFusionError; + use datafusion_proto_models::protobuf::{ + self, PhysicalColumn, PhysicalExprNode, physical_expr_node, + }; + + use crate::expect_expr_variant; + use crate::physical_expr::proto_decode::require_proto_field; + + fn column_node() -> PhysicalExprNode { + PhysicalExprNode { + expr_id: None, + expr_type: Some(physical_expr_node::ExprType::Column(PhysicalColumn { + name: "a".to_string(), + index: 0, + })), + } + } + + #[test] + fn require_proto_field_returns_inner() { + let v = require_proto_field(Some(7_u32), "FooExpr", "answer").unwrap(); + assert_eq!(v, 7); + } + + #[test] + fn require_proto_field_reports_missing() { + let err = require_proto_field::<u32>(None, "FooExpr", "answer").unwrap_err(); + assert!(matches!( + err, + DataFusionError::Internal(msg) + if msg.contains("FooExpr is missing required field 'answer'") + )); + } + + fn expect_column( + node: &PhysicalExprNode, + ) -> Result<&PhysicalColumn, DataFusionError> { + let inner = + expect_expr_variant!(node, physical_expr_node::ExprType::Column, "Column",); + Ok(inner) + } + + #[test] + fn expect_expr_variant_returns_inner_payload() { + let node = column_node(); + let col = expect_column(&node).unwrap(); + assert_eq!(col.name, "a"); + } + + #[test] + fn expect_expr_variant_rejects_wrong_variant() { + let node = PhysicalExprNode { + expr_id: None, + expr_type: Some(physical_expr_node::ExprType::Negative(Box::new( + protobuf::PhysicalNegativeNode { expr: None }, + ))), + }; + let err = expect_column(&node).unwrap_err(); + assert!(matches!( + err, + DataFusionError::Internal(msg) if msg.contains("PhysicalExprNode is not a Column") + )); + } + + #[test] + fn expect_expr_variant_rejects_missing_expr_type() { + let node = PhysicalExprNode { + expr_id: None, + expr_type: None, + }; + let err = expect_column(&node).unwrap_err(); + assert!(matches!( + err, + DataFusionError::Internal(msg) if msg.contains("PhysicalExprNode is not a Column") + )); + } +} diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 712f8f58f3..8be783985e 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -638,10 +638,7 @@ impl PhysicalExpr for BinaryExpr { // Reverse so operands are ordered from left innermost to right outermost. operand_refs.reverse(); - let operands = operand_refs - .iter() - .map(|e| ctx.encode_child(e)) - .collect::<Result<Vec<_>>>()?; + let operands = ctx.encode_children_expressions(operand_refs)?; Ok(Some(protobuf::PhysicalExprNode { expr_id: None, @@ -675,11 +672,13 @@ impl BinaryExpr { node: &datafusion_proto_models::protobuf::PhysicalExprNode, ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, ) -> Result<Arc<dyn PhysicalExpr>> { + use datafusion_physical_expr_common::expect_expr_variant; use datafusion_proto_models::protobuf; - let node = match &node.expr_type { - Some(protobuf::physical_expr_node::ExprType::BinaryExpr(b)) => b.as_ref(), - _ => return internal_err!("PhysicalExprNode is not a BinaryExpr"), - }; + let node = expect_expr_variant!( + node, + protobuf::physical_expr_node::ExprType::BinaryExpr, + "BinaryExpr", + ); let op = Operator::from_proto_name(&node.op).ok_or_else(|| { datafusion_common::DataFusionError::Internal(format!( "Unsupported binary operator '{}'", @@ -690,17 +689,12 @@ impl BinaryExpr { if !node.operands.is_empty() { // New linearized format: reduce the flat operands list back into // a nested binary expression tree. - let operands = node - .operands - .iter() - .map(|e| ctx.decode(e)) - .collect::<Result<Vec<_>>>()?; + let operands = ctx.decode_children_expressions(&node.operands)?; if operands.len() < 2 { - return Err(datafusion_common::DataFusionError::Internal( + return internal_err!( "A binary expression must always have at least 2 operands" - .to_string(), - )); + ); } Ok(operands @@ -711,21 +705,11 @@ impl BinaryExpr { .expect("Binary expression could not be reduced to a single expression.")) } else { // Legacy format with l/r fields. - let left = node.l.as_deref().ok_or_else(|| { - datafusion_common::DataFusionError::Internal( - "BinaryExpr is missing required field 'left'".to_string(), - ) - })?; - let right = node.r.as_deref().ok_or_else(|| { - datafusion_common::DataFusionError::Internal( - "BinaryExpr is missing required field 'right'".to_string(), - ) - })?; - Ok(Arc::new(BinaryExpr::new( - ctx.decode(left)?, - op, - ctx.decode(right)?, - ))) + let left = + ctx.decode_required_expression(node.l.as_deref(), "BinaryExpr", "left")?; + let right = + ctx.decode_required_expression(node.r.as_deref(), "BinaryExpr", "right")?; + Ok(Arc::new(BinaryExpr::new(left, op, right))) } } } diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index 2b1de870e7..0a96b00444 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -182,11 +182,13 @@ impl Column { node: &datafusion_proto_models::protobuf::PhysicalExprNode, _ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, ) -> Result<Arc<dyn PhysicalExpr>> { + use datafusion_physical_expr_common::expect_expr_variant; use datafusion_proto_models::protobuf; - let protobuf::PhysicalColumn { name, index } = match &node.expr_type { - Some(protobuf::physical_expr_node::ExprType::Column(c)) => c, - _ => return internal_err!("PhysicalExprNode is not a Column"), - }; + let protobuf::PhysicalColumn { name, index } = expect_expr_variant!( + node, + protobuf::physical_expr_node::ExprType::Column, + "Column", + ); Ok(Arc::new(Column::new(name, *index as usize))) } } diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index ea381a0483..1d3e244d73 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -252,16 +252,14 @@ impl InListExpr { node: &datafusion_proto_models::protobuf::PhysicalExprNode, ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, ) -> Result<Arc<dyn PhysicalExpr>> { + use datafusion_physical_expr_common::expect_expr_variant; use datafusion_proto_models::protobuf; - let node = match &node.expr_type { - Some(protobuf::physical_expr_node::ExprType::InList(n)) => n, - _ => { - return datafusion_common::internal_err!( - "PhysicalExprNode is not an InList" - ); - } - }; + let node = expect_expr_variant!( + node, + protobuf::physical_expr_node::ExprType::InList, + "InList", + ); let expr = ctx.decode_required_expression(node.expr.as_deref(), "InListExpr", "expr")?; @@ -3981,7 +3979,7 @@ mod proto_tests { let err = InListExpr::try_from_proto(&node, &ctx).unwrap_err(); assert!(matches!( err, - DataFusionError::Internal(msg) if msg.contains("PhysicalExprNode is not an InList") + DataFusionError::Internal(msg) if msg.contains("PhysicalExprNode is not a InList") )); } diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr/src/expressions/like.rs index b78d67a753..7535f109a0 100644 --- a/datafusion/physical-expr/src/expressions/like.rs +++ b/datafusion/physical-expr/src/expressions/like.rs @@ -180,15 +180,14 @@ impl LikeExpr { node: &datafusion_proto_models::protobuf::PhysicalExprNode, ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, ) -> Result<Arc<dyn PhysicalExpr>> { - use datafusion_common::internal_err; + use datafusion_physical_expr_common::expect_expr_variant; use datafusion_proto_models::protobuf; - let like_expr = match &node.expr_type { - Some(protobuf::physical_expr_node::ExprType::LikeExpr(like_expr)) => { - like_expr.as_ref() - } - _ => return internal_err!("PhysicalExprNode is not a LikeExpr"), - }; + let like_expr = expect_expr_variant!( + node, + protobuf::physical_expr_node::ExprType::LikeExpr, + "LikeExpr", + ); Ok(Arc::new(LikeExpr::new( like_expr.negated, diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs index b3ede9f1e9..9fbf38361c 100644 --- a/datafusion/physical-expr/src/expressions/negative.rs +++ b/datafusion/physical-expr/src/expressions/negative.rs @@ -200,14 +200,16 @@ impl NegativeExpr { node: &datafusion_proto_models::protobuf::PhysicalExprNode, ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, ) -> Result<Arc<dyn PhysicalExpr>> { + use datafusion_physical_expr_common::expect_expr_variant; use datafusion_proto_models::protobuf; - let expr = match &node.expr_type { - Some(protobuf::physical_expr_node::ExprType::Negative(n)) => { - ctx.decode_required_expression(n.expr.as_deref(), "NegativeExpr", "expr")? - } - _ => return internal_err!("PhysicalExprNode is not a Negative"), - }; + let n = expect_expr_variant!( + node, + protobuf::physical_expr_node::ExprType::Negative, + "Negative", + ); + let expr = + ctx.decode_required_expression(n.expr.as_deref(), "NegativeExpr", "expr")?; Ok(Arc::new(NegativeExpr::new(expr))) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
