This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-22463-eedae1154bf2745ea6d025f3e55901db1d8b7fb7
in repository https://gitbox.apache.org/repos/asf/datafusion.git

commit 2fc3b1dff95d729cdf6e912833981c87d5a30b03
Author: Tian Teng <[email protected]>
AuthorDate: Thu May 28 22:22:59 2026 +0200

    Port NotExpr proto hooks (#22463)
    
    ## Which issue does this PR close?
    
    - Closes #22422.
    
    ## Rationale for this change
    
    `NotExpr` still used the central physical expression protobuf downcast
    path. Moving it to the expression-level proto hook keeps it aligned with
    the newer serialization pattern and reduces the special-case branching
    in the shared conversion code.
    
    ## What changes are included in this PR?
    
    - Move `NotExpr` protobuf serialization into its `try_to_proto` hook.
    - Add `NotExpr::try_from_proto` and route decode through it.
    - Remove the old central `to_proto` downcast branch for `NotExpr`.
    
    ## Are these changes tested?
    
    Yes. I ran:
    
    - `cargo fmt --all -- --check`
    - `cargo check -p datafusion-physical-expr --features proto`
    - `cargo check -p datafusion-proto`
    - `cargo test -p datafusion-proto --test proto_integration
    roundtrip_filter_with_not`
    - `git diff --check`
    
    ## Are there any user-facing changes?
    
    No. This is an internal proto serialization refactor and should not
    change query behavior or public APIs.
    
    ---------
    
    Signed-off-by: Herrtian <[email protected]>
    Co-authored-by: Kumar Ujjawal <[email protected]>
---
 datafusion/physical-expr/src/expressions/not.rs  | 148 +++++++++++++++++++++++
 datafusion/proto/src/physical_plan/from_proto.rs |   8 +-
 datafusion/proto/src/physical_plan/to_proto.rs   |  15 +--
 3 files changed, 151 insertions(+), 20 deletions(-)

diff --git a/datafusion/physical-expr/src/expressions/not.rs 
b/datafusion/physical-expr/src/expressions/not.rs
index b63effdbb9..f856dd568a 100644
--- a/datafusion/physical-expr/src/expressions/not.rs
+++ b/datafusion/physical-expr/src/expressions/not.rs
@@ -181,6 +181,45 @@ impl PhysicalExpr for NotExpr {
         write!(f, "NOT ")?;
         self.arg.fmt_sql(f)
     }
+
+    #[cfg(feature = "proto")]
+    fn try_to_proto(
+        &self,
+        ctx: 
&datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
+    ) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
+        use datafusion_proto_models::protobuf;
+
+        Ok(Some(protobuf::PhysicalExprNode {
+            expr_id: None,
+            expr_type: 
Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
+                protobuf::PhysicalNot {
+                    expr: Some(Box::new(ctx.encode_child(&self.arg)?)),
+                },
+            ))),
+        }))
+    }
+}
+
+#[cfg(feature = "proto")]
+impl NotExpr {
+    /// Reconstruct a [`NotExpr`] from its protobuf representation.
+    pub fn try_from_proto(
+        node: &datafusion_proto_models::protobuf::PhysicalExprNode,
+        ctx: 
&datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        use datafusion_physical_expr_common::expect_expr_variant;
+        use datafusion_proto_models::protobuf;
+
+        let not_expr = expect_expr_variant!(
+            node,
+            protobuf::physical_expr_node::ExprType::NotExpr,
+            "NotExpr",
+        );
+        let expr =
+            ctx.decode_required_expression(not_expr.expr.as_deref(), 
"NotExpr", "expr")?;
+
+        Ok(Arc::new(NotExpr::new(expr)))
+    }
 }
 
 /// Creates a unary expression NOT
@@ -357,3 +396,112 @@ mod tests {
         Arc::clone(&SCHEMA)
     }
 }
+
+/// Tests for the `try_to_proto` / `try_from_proto` hooks.
+#[cfg(all(test, feature = "proto"))]
+mod proto_tests {
+    use super::*;
+    use crate::expressions::{Column, col};
+    use crate::proto_test_util::{
+        StubDecoder, StubEncoder, UnreachableDecoder, column_node,
+    };
+    use arrow::datatypes::Field;
+    use datafusion_common::DataFusionError;
+    use 
datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx;
+    use 
datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx;
+    use datafusion_proto_models::protobuf::{
+        PhysicalExprNode, PhysicalNot, physical_expr_node,
+    };
+
+    /// Build a `NotExpr` proto node with the given child.
+    fn not_node(expr: Option<Box<PhysicalExprNode>>) -> PhysicalExprNode {
+        PhysicalExprNode {
+            expr_id: None,
+            expr_type: Some(physical_expr_node::ExprType::NotExpr(Box::new(
+                PhysicalNot { expr },
+            ))),
+        }
+    }
+
+    /// A `NotExpr` over a boolean column.
+    fn not_fixture() -> NotExpr {
+        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, 
true)]);
+        NotExpr::new(col("a", &schema).unwrap())
+    }
+
+    #[test]
+    fn try_to_proto_encodes_not_expr() {
+        let not = not_fixture();
+        let encoder = StubEncoder::ok();
+        let ctx = PhysicalExprEncodeCtx::new(&encoder);
+
+        let node = not
+            .try_to_proto(&ctx)
+            .unwrap()
+            .expect("NotExpr should encode to Some(node)");
+
+        assert!(node.expr_id.is_none());
+        let not_node = match node.expr_type {
+            Some(physical_expr_node::ExprType::NotExpr(boxed)) => *boxed,
+            other => panic!("expected a NotExpr node, got {other:?}"),
+        };
+        assert!(not_node.expr.is_some());
+    }
+
+    #[test]
+    fn try_to_proto_propagates_expr_encode_error() {
+        let not = not_fixture();
+        let encoder = StubEncoder::failing_on(1);
+        let ctx = PhysicalExprEncodeCtx::new(&encoder);
+        let err = not.try_to_proto(&ctx).unwrap_err();
+        assert!(matches!(err, DataFusionError::Internal(msg) if 
msg.contains("call 1")));
+    }
+
+    #[test]
+    fn try_from_proto_decodes_not_expr() {
+        let node = not_node(Some(Box::new(column_node("a"))));
+        let schema = Schema::empty();
+        let decoder = StubDecoder::ok();
+        let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
+
+        let decoded = NotExpr::try_from_proto(&node, &ctx).unwrap();
+        let not = decoded
+            .downcast_ref::<NotExpr>()
+            .expect("decoded expr should be a NotExpr");
+        assert!(not.arg().downcast_ref::<Column>().is_some());
+    }
+
+    #[test]
+    fn try_from_proto_rejects_non_not_node() {
+        let node = column_node("a");
+        let schema = Schema::empty();
+        let decoder = UnreachableDecoder;
+        let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
+        let err = NotExpr::try_from_proto(&node, &ctx).unwrap_err();
+        assert!(
+            matches!(err, DataFusionError::Internal(msg) if 
msg.contains("PhysicalExprNode is not a NotExpr"))
+        );
+    }
+
+    #[test]
+    fn try_from_proto_rejects_missing_expr() {
+        let node = not_node(None);
+        let schema = Schema::empty();
+        let decoder = UnreachableDecoder;
+        let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
+        let err = NotExpr::try_from_proto(&node, &ctx).unwrap_err();
+        assert!(
+            matches!(err, DataFusionError::Internal(msg) if 
msg.contains("NotExpr is missing required field 'expr'"))
+        );
+    }
+
+    #[test]
+    fn try_from_proto_propagates_expr_decode_error() {
+        let node = not_node(Some(Box::new(column_node("a"))));
+        let schema = Schema::empty();
+        let decoder = StubDecoder::failing_on(1);
+        let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
+        let err = NotExpr::try_from_proto(&node, &ctx).unwrap_err();
+        assert!(matches!(err, DataFusionError::Internal(msg) if 
msg.contains("call 1")));
+    }
+}
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index 41f470aeb6..6d0898d7f5 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -316,13 +316,7 @@ pub fn parse_physical_expr_with_converter(
                 proto_converter,
             )?))
         }
-        ExprType::NotExpr(e) => 
Arc::new(NotExpr::new(parse_required_physical_expr(
-            e.expr.as_deref(),
-            ctx,
-            "expr",
-            input_schema,
-            proto_converter,
-        )?)),
+        ExprType::NotExpr(_) => NotExpr::try_from_proto(proto, &decode_ctx)?,
         ExprType::Negative(_) => NegativeExpr::try_from_proto(proto, 
&decode_ctx)?,
         ExprType::InList(_) => InListExpr::try_from_proto(proto, &decode_ctx)?,
         ExprType::Case(e) => Arc::new(CaseExpr::try_new(
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index 5c45c27502..6febf15835 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -36,8 +36,8 @@ use 
datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr;
 use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, 
StandardWindowExpr};
 use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
 use datafusion_physical_plan::expressions::{
-    CaseExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal, 
NotExpr,
-    TryCastExpr, UnKnownColumn,
+    CaseExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal, 
TryCastExpr,
+    UnKnownColumn,
 };
 use datafusion_physical_plan::udaf::AggregateFunctionExpr;
 use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, 
WindowUDFExpr};
@@ -355,17 +355,6 @@ pub fn serialize_physical_expr_with_converter(
                 ),
             ),
         })
-    } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
-        Ok(protobuf::PhysicalExprNode {
-            expr_id,
-            expr_type: 
Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
-                protobuf::PhysicalNot {
-                    expr: Some(Box::new(
-                        proto_converter.physical_expr_to_proto(expr.arg(), 
codec)?,
-                    )),
-                },
-            ))),
-        })
     } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
         Ok(protobuf::PhysicalExprNode {
             expr_id,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to