This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 58735ea30efed9da4b9d0b156aabe156ecff7ca3
Author: Mark Payne <[email protected]>
AuthorDate: Thu Jan 15 11:09:17 2026 -0500

    NIFI-15446: When invoking ConnectorMethod, make sure to serialize/des… 
(#10750)
    
    * NIFI-15446: When invoking ConnectorMethod, make sure to 
serialize/deserialize the arguments and return value
    
    * NIFI-15446: Addressed review feedback
---
 .../nifi/controller/StandardProcessorNode.java     |  40 +++-
 .../service/StandardControllerServiceNode.java     |  41 +++-
 .../org/apache/nifi/controller/ProcessorNode.java  |   2 +-
 .../controller/service/ControllerServiceNode.java  |   2 +-
 .../StandaloneControllerServiceFacade.java         |  50 +++-
 .../standalone/StandaloneProcessorFacade.java      |  52 ++++-
 .../tests/system/CalculateConnector.java           | 257 +++++++++++++++++++++
 .../nifi/processors/tests/system/Calculate.java    |  71 ++++++
 .../org.apache.nifi.components.connector.Connector |   1 +
 .../services/org.apache.nifi.processor.Processor   |   1 +
 .../connectors/ConnectorMethodMarshallingIT.java   |  71 ++++++
 11 files changed, 559 insertions(+), 29 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 3823428856..7570cfe9e1 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -16,6 +16,10 @@
  */
 package org.apache.nifi.controller;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.Restricted;
@@ -140,7 +144,9 @@ import java.util.stream.Stream;
 public class StandardProcessorNode extends ProcessorNode implements 
Connectable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StandardProcessorNode.class);
-
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+        .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+        .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL);
 
     public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
     public static final String DEFAULT_YIELD_PERIOD = "1 sec";
@@ -1941,31 +1947,36 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
     }
 
     @Override
-    public Object invokeConnectorMethod(final String methodName, final 
Map<String, Object> arguments, final ProcessContext processContext) throws 
InvocationFailedException {
+    public String invokeConnectorMethod(final String methodName, final 
Map<String, String> jsonArguments, final ProcessContext processContext) throws 
InvocationFailedException {
         final ConfigurableComponent component = getComponent();
 
         try (final NarCloseable ignored = 
NarCloseable.withComponentNarLoader(getExtensionManager(), 
component.getClass(), getIdentifier())) {
             final Method implementationMethod = 
discoverConnectorMethod(component.getClass(), methodName);
             final MethodArgument[] methodArguments = 
getConnectorMethodArguments(methodName, implementationMethod, component);
             final List<Object> argumentValues = new ArrayList<>();
+
             for (final MethodArgument methodArgument : methodArguments) {
-                final Object argumentValue = 
arguments.get(methodArgument.name());
                 if (ProcessContext.class.equals(methodArgument.type())) {
                     continue;
                 }
 
-                if (argumentValue == null && methodArgument.required()) {
+                final String jsonValue = 
jsonArguments.get(methodArgument.name());
+                if (jsonValue == null && methodArgument.required()) {
                     throw new IllegalArgumentException("Cannot invoke 
Connector Method '" + methodName + "' on " + this + " because the required 
argument '"
                         + methodArgument.name() + "' was not provided");
                 }
 
-                if (argumentValue != null && 
!(methodArgument.type().isAssignableFrom(argumentValue.getClass()))) {
-                    throw new IllegalArgumentException("Cannot invoke 
Connector Method '" + methodName + "' on " + this + " because the argument '"
-                        + methodArgument.name() + "' is of type " + 
argumentValue.getClass().getName() + " defined by " + 
argumentValue.getClass().getClassLoader()
-                        + " but the method expects type " + 
methodArgument.type().getName() + " defined by " + 
methodArgument.type().getClassLoader());
+                if (jsonValue == null) {
+                    argumentValues.add(null);
+                } else {
+                    try {
+                        final Object argumentValue = 
OBJECT_MAPPER.readValue(jsonValue, methodArgument.type());
+                        argumentValues.add(argumentValue);
+                    } catch (final JsonProcessingException e) {
+                        throw new InvocationFailedException("Failed to 
deserialize argument '" + methodArgument.name() + "' as type " + 
methodArgument.type().getName() +
+                                                            " for Connector 
Method '" + methodName + "' on " + this, e);
+                    }
                 }
-
-                argumentValues.add(argumentValue);
             }
 
             // Inject ProcessContext if the method signature supports it
@@ -1979,7 +1990,14 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
             try {
                 implementationMethod.setAccessible(true);
-                return implementationMethod.invoke(component, 
argumentValues.toArray());
+                final Object result = implementationMethod.invoke(component, 
argumentValues.toArray());
+                if (result == null) {
+                    return null;
+                }
+
+                return OBJECT_MAPPER.writeValueAsString(result);
+            } catch (final JsonProcessingException e) {
+                throw new InvocationFailedException("Failed to serialize 
return value for Connector Method '" + methodName + "' on " + this, e);
             } catch (final Exception e) {
                 throw new InvocationFailedException(e);
             }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 57efd27d08..6bcc7a2c4d 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -16,6 +16,10 @@
  */
 package org.apache.nifi.controller.service;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.Restricted;
 import org.apache.nifi.annotation.documentation.DeprecationNotice;
@@ -104,6 +108,9 @@ import java.util.stream.Collectors;
 public class StandardControllerServiceNode extends AbstractComponentNode 
implements ControllerServiceNode {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StandardControllerServiceNode.class);
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+        .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+        .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL);
 
     private static final long INCREMENTAL_VALIDATION_DELAY_MS = 1000;
     private static final Duration MAXIMUM_DELAY = Duration.ofMinutes(10);
@@ -904,37 +911,40 @@ public class StandardControllerServiceNode extends 
AbstractComponentNode impleme
         }
     }
 
-    // TODO: Refactor, this is the same between ProcessorNode / 
ControllerServiceNode except for getComponentState
     @Override
-    public Object invokeConnectorMethod(final String methodName, final 
Map<String, Object> arguments, final ConfigurationContext configurationContext) 
throws InvocationFailedException {
+    public String invokeConnectorMethod(final String methodName, final 
Map<String, String> jsonArguments, final ConfigurationContext 
configurationContext) throws InvocationFailedException {
         final ConfigurableComponent component = getComponent();
 
         try (final NarCloseable ignored = 
NarCloseable.withComponentNarLoader(getExtensionManager(), 
component.getClass(), getIdentifier())) {
             final Method implementationMethod = 
discoverConnectorMethod(component.getClass(), methodName);
             final MethodArgument[] methodArguments = 
getConnectorMethodArguments(methodName, implementationMethod, component);
             final List<Object> argumentValues = new ArrayList<>();
+
             for (final MethodArgument methodArgument : methodArguments) {
-                final Object argumentValue = 
arguments.get(methodArgument.name());
                 if (ConfigurationContext.class.equals(methodArgument.type())) {
                     continue;
                 }
 
-                if (argumentValue == null && methodArgument.required()) {
+                final String jsonValue = 
jsonArguments.get(methodArgument.name());
+                if (jsonValue == null && methodArgument.required()) {
                     throw new IllegalArgumentException("Cannot invoke 
Connector Method '" + methodName + "' on " + this + " because the required 
argument '"
                                                        + methodArgument.name() 
+ "' was not provided");
                 }
 
-                if (argumentValue != null && 
!(methodArgument.type().isAssignableFrom(argumentValue.getClass()))) {
-                    throw new IllegalArgumentException("Cannot invoke 
Connector Method '" + methodName + "' on " + this + " because the argument '"
-                                                       + methodArgument.name() 
+ "' is of type " + argumentValue.getClass().getName() + " defined by " + 
argumentValue.getClass().getClassLoader()
-                                                       + " but the method 
expects type " + methodArgument.type().getName() + " defined by " + 
methodArgument.type().getClassLoader());
+                if (jsonValue == null) {
+                    argumentValues.add(null);
+                } else {
+                    try {
+                        final Object argumentValue = 
OBJECT_MAPPER.readValue(jsonValue, methodArgument.type());
+                        argumentValues.add(argumentValue);
+                    } catch (final JsonProcessingException e) {
+                        throw new InvocationFailedException("Failed to 
deserialize argument '" + methodArgument.name() + "' as type " + 
methodArgument.type().getName()
+                                                            + " for Connector 
Method '" + methodName + "' on " + this, e);
+                    }
                 }
-
-                argumentValues.add(argumentValue);
             }
 
             // Inject ConfigurationContext if the method signature supports it
-            // TODO: Can we move away from Maps and instead just use 
reflection to get the arguments directly?
             final Class<?>[] argumentTypes = 
implementationMethod.getParameterTypes();
             if (argumentTypes.length > 0 && 
ConfigurationContext.class.isAssignableFrom(argumentTypes[0])) {
                 argumentValues.addFirst(configurationContext);
@@ -945,7 +955,14 @@ public class StandardControllerServiceNode extends 
AbstractComponentNode impleme
 
             try {
                 implementationMethod.setAccessible(true);
-                return implementationMethod.invoke(component, 
argumentValues.toArray());
+                final Object result = implementationMethod.invoke(component, 
argumentValues.toArray());
+                if (result == null) {
+                    return null;
+                }
+
+                return OBJECT_MAPPER.writeValueAsString(result);
+            } catch (final JsonProcessingException e) {
+                throw new InvocationFailedException("Failed to serialize 
return value for Connector Method '" + methodName + "' on " + this, e);
             } catch (final Exception e) {
                 throw new InvocationFailedException(e);
             }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 001fae6785..fce8eeb21e 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -312,7 +312,7 @@ public abstract class ProcessorNode extends 
AbstractComponentNode implements Con
 
     public abstract void migrateConfiguration(Map<String, String> 
originalPropertyValues, ControllerServiceFactory serviceFactory);
 
-    public abstract Object invokeConnectorMethod(String methodName, 
Map<String, Object> arguments, ProcessContext processContext) throws 
InvocationFailedException;
+    public abstract String invokeConnectorMethod(String methodName, 
Map<String, String> jsonArguments, ProcessContext processContext) throws 
InvocationFailedException;
 
     public abstract List<ConnectorMethod> getConnectorMethods();
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index 2b2f78723e..16b8b7e8d3 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -263,7 +263,7 @@ public interface ControllerServiceNode extends 
ComponentNode, VersionedComponent
 
     void migrateConfiguration(Map<String, String> originalPropertyValues, 
ControllerServiceFactory serviceFactory);
 
-    Object invokeConnectorMethod(String methodName, Map<String, Object> 
arguments, ConfigurationContext configurationContext) throws 
InvocationFailedException;
+    String invokeConnectorMethod(String methodName, Map<String, String> 
jsonArguments, ConfigurationContext configurationContext) throws 
InvocationFailedException;
 
     List<ConnectorMethod> getConnectorMethods();
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java
index 1ec92cad5c..59cbb92516 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java
@@ -17,6 +17,10 @@
 
 package org.apache.nifi.components.connector.facades.standalone;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.asset.AssetManager;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.ValidationContext;
@@ -39,10 +43,15 @@ import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.parameter.ParameterContext;
 import org.apache.nifi.parameter.ParameterLookup;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 public class StandaloneControllerServiceFacade implements 
ControllerServiceFacade {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+        .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+        .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL);
+
     private final ControllerServiceNode controllerServiceNode;
     private final VersionedControllerService versionedControllerService;
     private final ParameterContext parameterContext;
@@ -161,12 +170,49 @@ public class StandaloneControllerServiceFacade implements 
ControllerServiceFacad
 
     @Override
     public Object invokeConnectorMethod(final String methodName, final 
Map<String, Object> arguments) throws InvocationFailedException {
+        final Map<String, String> jsonArguments = 
serializeArgumentsToJson(arguments);
         final ConfigurationContext configurationContext = 
componentContextProvider.createConfigurationContext(controllerServiceNode, 
parameterContext);
-        return controllerServiceNode.invokeConnectorMethod(methodName, 
arguments, configurationContext);
+        final String jsonResult = 
controllerServiceNode.invokeConnectorMethod(methodName, jsonArguments, 
configurationContext);
+        if (jsonResult == null) {
+            return null;
+        }
+
+        try {
+            return OBJECT_MAPPER.readValue(jsonResult, Object.class);
+        } catch (final JsonProcessingException e) {
+            throw new InvocationFailedException("Failed to deserialize return 
value from Connector Method '" + methodName + "'", e);
+        }
     }
 
     @Override
     public <T> T invokeConnectorMethod(final String methodName, final 
Map<String, Object> arguments, final Class<T> returnType) throws 
InvocationFailedException {
-        return returnType.cast(invokeConnectorMethod(methodName, arguments));
+        final Map<String, String> jsonArguments = 
serializeArgumentsToJson(arguments);
+        final ConfigurationContext configurationContext = 
componentContextProvider.createConfigurationContext(controllerServiceNode, 
parameterContext);
+        final String jsonResult = 
controllerServiceNode.invokeConnectorMethod(methodName, jsonArguments, 
configurationContext);
+        if (jsonResult == null) {
+            return null;
+        }
+
+        try {
+            return OBJECT_MAPPER.readValue(jsonResult, returnType);
+        } catch (final JsonProcessingException e) {
+            throw new InvocationFailedException("Failed to deserialize return 
value from Connector Method '" + methodName + "'", e);
+        }
+    }
+
+    private Map<String, String> serializeArgumentsToJson(final Map<String, 
Object> arguments) throws InvocationFailedException {
+        final Map<String, String> jsonArguments = new HashMap<>();
+        for (final Map.Entry<String, Object> entry : arguments.entrySet()) {
+            if (entry.getValue() == null) {
+                jsonArguments.put(entry.getKey(), null);
+            } else {
+                try {
+                    jsonArguments.put(entry.getKey(), 
OBJECT_MAPPER.writeValueAsString(entry.getValue()));
+                } catch (final JsonProcessingException e) {
+                    throw new InvocationFailedException("Failed to serialize 
argument '" + entry.getKey() + "' to JSON", e);
+                }
+            }
+        }
+        return jsonArguments;
     }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java
index 5fe3f9ef83..707aa8b67c 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java
@@ -17,6 +17,10 @@
 
 package org.apache.nifi.components.connector.facades.standalone;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.asset.AssetManager;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.ValidationContext;
@@ -41,10 +45,15 @@ import org.apache.nifi.parameter.ParameterLookup;
 import org.apache.nifi.processor.ProcessContext;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 public class StandaloneProcessorFacade implements ProcessorFacade {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+        .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+        .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL);
+
     private final ProcessorNode processorNode;
     private final VersionedProcessor versionedProcessor;
     private final ParameterContext parameterContext;
@@ -169,12 +178,51 @@ public class StandaloneProcessorFacade implements 
ProcessorFacade {
 
     @Override
     public Object invokeConnectorMethod(final String methodName, final 
Map<String, Object> arguments) throws InvocationFailedException {
+        final Map<String, String> jsonArguments = 
serializeArgumentsToJson(arguments);
         final ProcessContext processContext = 
componentContextProvider.createProcessContext(processorNode, parameterContext);
-        return processorNode.invokeConnectorMethod(methodName, arguments, 
processContext);
+        final String jsonResult = 
processorNode.invokeConnectorMethod(methodName, jsonArguments, processContext);
+        if (jsonResult == null) {
+            return null;
+        }
+
+        try {
+            return OBJECT_MAPPER.readValue(jsonResult, Object.class);
+        } catch (final JsonProcessingException e) {
+            throw new InvocationFailedException("Failed to deserialize return 
value from Connector Method '" + methodName + "'", e);
+        }
     }
 
     @Override
     public <T> T invokeConnectorMethod(final String methodName, final 
Map<String, Object> arguments, final Class<T> returnType) throws 
InvocationFailedException {
-        return returnType.cast(invokeConnectorMethod(methodName, arguments));
+        final Map<String, String> jsonArguments = 
serializeArgumentsToJson(arguments);
+        final ProcessContext processContext = 
componentContextProvider.createProcessContext(processorNode, parameterContext);
+        final String jsonResult = 
processorNode.invokeConnectorMethod(methodName, jsonArguments, processContext);
+        if (jsonResult == null) {
+            return null;
+        }
+
+        try {
+            return OBJECT_MAPPER.readValue(jsonResult, returnType);
+        } catch (final JsonProcessingException e) {
+            throw new InvocationFailedException("Failed to deserialize return 
value from Connector Method '" + methodName + "'", e);
+        }
+    }
+
+    private Map<String, String> serializeArgumentsToJson(final Map<String, 
Object> arguments) throws InvocationFailedException {
+        final Map<String, String> jsonArguments = new HashMap<>();
+
+        for (final Map.Entry<String, Object> entry : arguments.entrySet()) {
+            if (entry.getValue() == null) {
+                jsonArguments.put(entry.getKey(), null);
+            } else {
+                try {
+                    jsonArguments.put(entry.getKey(), 
OBJECT_MAPPER.writeValueAsString(entry.getValue()));
+                } catch (final JsonProcessingException e) {
+                    throw new InvocationFailedException("Failed to serialize 
argument '" + entry.getKey() + "' to JSON", e);
+                }
+            }
+        }
+
+        return jsonArguments;
     }
 }
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java
new file mode 100644
index 0000000000..db43577038
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.connectors.tests.system;
+
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.connector.AbstractConnector;
+import org.apache.nifi.components.connector.ConfigurationStep;
+import org.apache.nifi.components.connector.ConnectorPropertyDescriptor;
+import org.apache.nifi.components.connector.ConnectorPropertyGroup;
+import org.apache.nifi.components.connector.FlowUpdateException;
+import org.apache.nifi.components.connector.InvocationFailedException;
+import org.apache.nifi.components.connector.PropertyType;
+import org.apache.nifi.components.connector.StepConfigurationContext;
+import org.apache.nifi.components.connector.components.FlowContext;
+import org.apache.nifi.components.connector.components.ProcessorFacade;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A test connector that invokes a ConnectorMethod on a CalculateProcessor 
using its own POJO types.
+ * This connector is used to test JSON marshalling of complex objects across 
ClassLoader boundaries.
+ */
+public class CalculateConnector extends AbstractConnector {
+
+    /**
+     * A POJO representing a calculation request. This is intentionally a 
different class than
+     * the Processor's record type to test cross-ClassLoader JSON marshalling.
+     */
+    public static class Calculation {
+        private int operand1;
+        private int operand2;
+        private String operation;
+
+        public Calculation() {
+        }
+
+        public Calculation(final int operand1, final int operand2, final 
String operation) {
+            this.operand1 = operand1;
+            this.operand2 = operand2;
+            this.operation = operation;
+        }
+
+        public int getOperand1() {
+            return operand1;
+        }
+
+        public void setOperand1(final int operand1) {
+            this.operand1 = operand1;
+        }
+
+        public int getOperand2() {
+            return operand2;
+        }
+
+        public void setOperand2(final int operand2) {
+            this.operand2 = operand2;
+        }
+
+        public String getOperation() {
+            return operation;
+        }
+
+        public void setOperation(final String operation) {
+            this.operation = operation;
+        }
+    }
+
+    /**
+     * A POJO representing the result of a calculation. This is intentionally 
a different class than
+     * the Processor's record type to test cross-ClassLoader JSON marshalling.
+     */
+    public static class CalculatedResult {
+        private Calculation calculation;
+        private int result;
+
+        public CalculatedResult() {
+        }
+
+        public Calculation getCalculation() {
+            return calculation;
+        }
+
+        public void setCalculation(final Calculation calculation) {
+            this.calculation = calculation;
+        }
+
+        public int getResult() {
+            return result;
+        }
+
+        public void setResult(final int result) {
+            this.result = result;
+        }
+    }
+
+    private static final ConnectorPropertyDescriptor OPERAND_1 = new 
ConnectorPropertyDescriptor.Builder()
+        .name("Operand 1")
+        .description("The first operand for the calculation")
+        .type(PropertyType.STRING)
+        .required(true)
+        .addValidator(StandardValidators.INTEGER_VALIDATOR)
+        .build();
+
+    private static final ConnectorPropertyDescriptor OPERAND_2 = new 
ConnectorPropertyDescriptor.Builder()
+        .name("Operand 2")
+        .description("The second operand for the calculation")
+        .type(PropertyType.STRING)
+        .required(true)
+        .addValidator(StandardValidators.INTEGER_VALIDATOR)
+        .build();
+
+    private static final ConnectorPropertyDescriptor OPERATION = new 
ConnectorPropertyDescriptor.Builder()
+        .name("Operation")
+        .description("The operation to perform (ADD, SUBTRACT, MULTIPLY, 
DIVIDE)")
+        .type(PropertyType.STRING)
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    private static final ConnectorPropertyDescriptor OUTPUT_FILE = new 
ConnectorPropertyDescriptor.Builder()
+        .name("Output File")
+        .description("The file to write the calculation result to")
+        .type(PropertyType.STRING)
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    private static final ConnectorPropertyGroup CALCULATION_GROUP = new 
ConnectorPropertyGroup.Builder()
+        .name("Calculation Configuration")
+        .description("Configuration properties for the calculation")
+        .properties(List.of(OPERAND_1, OPERAND_2, OPERATION, OUTPUT_FILE))
+        .build();
+
+    private static final ConfigurationStep CALCULATION_STEP = new 
ConfigurationStep.Builder()
+        .name("Calculation")
+        .description("Configure the calculation parameters")
+        .propertyGroups(List.of(CALCULATION_GROUP))
+        .build();
+
+    @Override
+    public VersionedExternalFlow getInitialFlow() {
+        final VersionedProcessGroup group = new VersionedProcessGroup();
+        group.setName("Calculate Flow");
+        group.setIdentifier("calculate-flow-id");
+
+        final Bundle bundle = new Bundle();
+        bundle.setGroup("org.apache.nifi");
+        bundle.setArtifact("nifi-system-test-extensions-nar");
+        bundle.setVersion("2.7.0-SNAPSHOT");
+
+        final VersionedProcessor processor = new VersionedProcessor();
+        processor.setIdentifier("calculate-processor-id");
+        processor.setName("Calculate Processor");
+        processor.setType("org.apache.nifi.processors.tests.system.Calculate");
+        processor.setBundle(bundle);
+        processor.setProperties(Map.of());
+        processor.setPropertyDescriptors(Map.of());
+        processor.setScheduledState(ScheduledState.ENABLED);
+        processor.setBulletinLevel("WARN");
+        processor.setPosition(new Position(0D, 0D));
+        processor.setPenaltyDuration("30 sec");
+        processor.setAutoTerminatedRelationships(Set.of());
+        processor.setExecutionNode("ALL");
+        processor.setGroupIdentifier(group.getIdentifier());
+        processor.setConcurrentlySchedulableTaskCount(1);
+        processor.setRunDurationMillis(0L);
+        processor.setSchedulingStrategy("TIMER_DRIVEN");
+        processor.setYieldDuration("1 sec");
+        processor.setSchedulingPeriod("0 sec");
+        processor.setStyle(Map.of());
+        group.setProcessors(Set.of(processor));
+
+        final VersionedExternalFlow flow = new VersionedExternalFlow();
+        flow.setFlowContents(group);
+        flow.setParameterContexts(Map.of());
+        return flow;
+    }
+
+    @Override
+    public List<ConfigurationStep> getConfigurationSteps() {
+        return List.of(CALCULATION_STEP);
+    }
+
+    @Override
+    protected void onStepConfigured(final String stepName, final FlowContext 
workingContext) {
+    }
+
+    @Override
+    public void applyUpdate(final FlowContext workingContext, final 
FlowContext activeContext) throws FlowUpdateException {
+        final StepConfigurationContext stepContext = 
workingContext.getConfigurationContext().scopedToStep(CALCULATION_STEP);
+        final int operand1 = stepContext.getProperty(OPERAND_1).asInteger();
+        final int operand2 = stepContext.getProperty(OPERAND_2).asInteger();
+        final String operation = stepContext.getProperty(OPERATION).getValue();
+        final String outputFile = 
stepContext.getProperty(OUTPUT_FILE).getValue();
+
+        final ProcessorFacade processorFacade = 
workingContext.getRootGroup().getProcessors().stream()
+            .filter(p -> p.getDefinition().getType().endsWith("Calculate"))
+            .findFirst()
+            .orElseThrow(() -> new FlowUpdateException("CalculateProcessor not 
found in flow"));
+
+        final Calculation calculation = new Calculation(operand1, operand2, 
operation);
+        final CalculatedResult result;
+        try {
+            result = processorFacade.invokeConnectorMethod("calculate", 
Map.of("calculation", calculation), CalculatedResult.class);
+        } catch (final InvocationFailedException e) {
+            throw new FlowUpdateException("Failed to invoke calculate method", 
e);
+        }
+
+        final File file = new File(outputFile);
+        try (final FileWriter writer = new FileWriter(file)) {
+            
writer.write(String.valueOf(result.getCalculation().getOperand1()));
+            writer.write("\n");
+            
writer.write(String.valueOf(result.getCalculation().getOperand2()));
+            writer.write("\n");
+            writer.write(result.getCalculation().getOperation());
+            writer.write("\n");
+            writer.write(String.valueOf(result.getResult()));
+        } catch (final IOException e) {
+            throw new FlowUpdateException("Failed to write result to file", e);
+        }
+
+        getLogger().info("Calculation result: {} {} {} = {}", operand1, 
operation, operand2, result.getResult());
+    }
+
+    @Override
+    public List<ConfigVerificationResult> verifyConfigurationStep(final String 
stepName, final Map<String, String> propertyValueOverrides, final FlowContext 
flowContext) {
+        return List.of();
+    }
+}
+
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/Calculate.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/Calculate.java
new file mode 100644
index 0000000000..d8bc65b8a5
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/Calculate.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.components.connector.components.ComponentState;
+import org.apache.nifi.components.connector.components.ConnectorMethod;
+import org.apache.nifi.components.connector.components.MethodArgument;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+/**
+ * A test processor that exposes a ConnectorMethod for performing calculations.
+ * This processor is used to test JSON marshalling of complex objects across 
ClassLoader boundaries.
+ */
+public class Calculate extends AbstractProcessor {
+
+    @ConnectorMethod(
+        name = "calculate",
+        description = "Performs a calculation based on the provided 
Calculation object",
+        allowedStates = ComponentState.STOPPED,
+        arguments = {
+            @MethodArgument(name = "calculation", type = Calculation.class, 
description = "The calculation to perform", required = true)
+        }
+    )
+    public Object calculate(final Calculation calculation) {
+        final int result = switch (calculation.operation()) {
+            case "ADD" -> calculation.operand1() + calculation.operand2();
+            case "SUBTRACT" -> calculation.operand1() - calculation.operand2();
+            case "MULTIPLY" -> calculation.operand1() * calculation.operand2();
+            case "DIVIDE" -> calculation.operand1() / calculation.operand2();
+            default -> throw new IllegalArgumentException("Unknown operation: 
" + calculation.operation());
+        };
+
+        return new CalculatedResult(calculation, result);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext processContext, final 
ProcessSession processSession) throws ProcessException {
+    }
+
+    /**
+     * A record representing a calculation request with two operands and an 
operation.
+     */
+    public record Calculation(int operand1, int operand2, String operation) {
+    }
+
+    /**
+     * A record representing the result of a calculation, including the 
original calculation and the result.
+     */
+    public record CalculatedResult(Calculation calculation, int result) {
+    }
+
+}
+
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
index a54f75d132..34fe7c90ea 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
@@ -17,3 +17,4 @@ org.apache.nifi.connectors.tests.system.NopConnector
 org.apache.nifi.connectors.tests.system.AssetConnector
 org.apache.nifi.connectors.tests.system.DataQueuingConnector
 org.apache.nifi.connectors.tests.system.NestedProcessGroupConnector
+org.apache.nifi.connectors.tests.system.CalculateConnector
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index d0a01b38b5..93e561e6d8 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -62,3 +62,4 @@ 
org.apache.nifi.processors.tests.system.WriteFlowFileCountToFile
 org.apache.nifi.processors.tests.system.WriteLifecycleEvents
 org.apache.nifi.processors.tests.system.WriteToFile
 org.apache.nifi.processors.tests.system.YieldSource
+org.apache.nifi.processors.tests.system.Calculate
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorMethodMarshallingIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorMethodMarshallingIT.java
new file mode 100644
index 0000000000..e57bf852d2
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorMethodMarshallingIT.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.connectors;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * System tests that verify JSON marshalling of complex objects across 
ClassLoader boundaries
+ * when invoking ConnectorMethods on Processors.
+ */
+public class ConnectorMethodMarshallingIT extends NiFiSystemIT {
+
+    @Test
+    public void testComplexObjectMarshalling() throws NiFiClientException, 
IOException, InterruptedException {
+        final File outputFile = new File("target/calculate-result.txt");
+        if (outputFile.exists()) {
+            assertTrue(outputFile.delete(), "Failed to delete existing output 
file");
+        }
+
+        final ConnectorEntity connector = 
getClientUtil().createConnector("CalculateConnector");
+        assertNotNull(connector);
+
+        getClientUtil().configureConnector(connector, "Calculation", Map.of(
+            "Operand 1", "10",
+            "Operand 2", "5",
+            "Operation", "ADD",
+            "Output File", outputFile.getAbsolutePath()
+        ));
+
+        getClientUtil().applyConnectorUpdate(connector);
+        getClientUtil().waitForValidConnector(connector.getId());
+
+        assertTrue(outputFile.exists(), "Output file was not created");
+        final String fileContents = Files.readString(outputFile.toPath(), 
StandardCharsets.UTF_8);
+        final String[] lines = fileContents.split("\n");
+        assertEquals(4, lines.length, "Output file should contain 4 lines");
+        assertEquals("10", lines[0], "First line should be operand1");
+        assertEquals("5", lines[1], "Second line should be operand2");
+        assertEquals("ADD", lines[2], "Third line should be the operation");
+        assertEquals("15", lines[3], "Fourth line should be the result (10 + 5 
= 15)");
+    }
+}
+


Reply via email to