This is an automated email from the ASF dual-hosted git repository.
mcgilman pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new 5bed6fd393 NIFI-15446: When invoking ConnectorMethod, make sure to
serialize/des… (#10750)
5bed6fd393 is described below
commit 5bed6fd3930f11f1b2b5e81384f3c4f0723c8091
Author: Mark Payne <[email protected]>
AuthorDate: Thu Jan 15 11:09:17 2026 -0500
NIFI-15446: When invoking ConnectorMethod, make sure to serialize/des…
(#10750)
* NIFI-15446: When invoking ConnectorMethod, make sure to
serialize/deserialize the arguments and return value
* NIFI-15446: Addressed review feedback
---
.../nifi/controller/StandardProcessorNode.java | 40 +++-
.../service/StandardControllerServiceNode.java | 41 +++-
.../org/apache/nifi/controller/ProcessorNode.java | 2 +-
.../controller/service/ControllerServiceNode.java | 2 +-
.../StandaloneControllerServiceFacade.java | 50 +++-
.../standalone/StandaloneProcessorFacade.java | 52 ++++-
.../tests/system/CalculateConnector.java | 257 +++++++++++++++++++++
.../nifi/processors/tests/system/Calculate.java | 71 ++++++
.../org.apache.nifi.components.connector.Connector | 1 +
.../services/org.apache.nifi.processor.Processor | 1 +
.../connectors/ConnectorMethodMarshallingIT.java | 71 ++++++
11 files changed, 559 insertions(+), 29 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 9e9b7a02e8..f2095aa4ba 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -16,6 +16,10 @@
*/
package org.apache.nifi.controller;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Restricted;
@@ -140,7 +144,9 @@ import java.util.stream.Stream;
public class StandardProcessorNode extends ProcessorNode implements
Connectable {
private static final Logger LOG =
LoggerFactory.getLogger(StandardProcessorNode.class);
-
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL);
public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
public static final String DEFAULT_YIELD_PERIOD = "1 sec";
@@ -1941,31 +1947,36 @@ public class StandardProcessorNode extends
ProcessorNode implements Connectable
}
@Override
- public Object invokeConnectorMethod(final String methodName, final
Map<String, Object> arguments, final ProcessContext processContext) throws
InvocationFailedException {
+ public String invokeConnectorMethod(final String methodName, final
Map<String, String> jsonArguments, final ProcessContext processContext) throws
InvocationFailedException {
final ConfigurableComponent component = getComponent();
try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(getExtensionManager(),
component.getClass(), getIdentifier())) {
final Method implementationMethod =
discoverConnectorMethod(component.getClass(), methodName);
final MethodArgument[] methodArguments =
getConnectorMethodArguments(methodName, implementationMethod, component);
final List<Object> argumentValues = new ArrayList<>();
+
for (final MethodArgument methodArgument : methodArguments) {
- final Object argumentValue =
arguments.get(methodArgument.name());
if (ProcessContext.class.equals(methodArgument.type())) {
continue;
}
- if (argumentValue == null && methodArgument.required()) {
+ final String jsonValue =
jsonArguments.get(methodArgument.name());
+ if (jsonValue == null && methodArgument.required()) {
throw new IllegalArgumentException("Cannot invoke
Connector Method '" + methodName + "' on " + this + " because the required
argument '"
+ methodArgument.name() + "' was not provided");
}
- if (argumentValue != null &&
!(methodArgument.type().isAssignableFrom(argumentValue.getClass()))) {
- throw new IllegalArgumentException("Cannot invoke
Connector Method '" + methodName + "' on " + this + " because the argument '"
- + methodArgument.name() + "' is of type " +
argumentValue.getClass().getName() + " defined by " +
argumentValue.getClass().getClassLoader()
- + " but the method expects type " +
methodArgument.type().getName() + " defined by " +
methodArgument.type().getClassLoader());
+ if (jsonValue == null) {
+ argumentValues.add(null);
+ } else {
+ try {
+ final Object argumentValue =
OBJECT_MAPPER.readValue(jsonValue, methodArgument.type());
+ argumentValues.add(argumentValue);
+ } catch (final JsonProcessingException e) {
+ throw new InvocationFailedException("Failed to
deserialize argument '" + methodArgument.name() + "' as type " +
methodArgument.type().getName() +
+ " for Connector
Method '" + methodName + "' on " + this, e);
+ }
}
-
- argumentValues.add(argumentValue);
}
// Inject ProcessContext if the method signature supports it
@@ -1979,7 +1990,14 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
try {
implementationMethod.setAccessible(true);
- return implementationMethod.invoke(component,
argumentValues.toArray());
+ final Object result = implementationMethod.invoke(component,
argumentValues.toArray());
+ if (result == null) {
+ return null;
+ }
+
+ return OBJECT_MAPPER.writeValueAsString(result);
+ } catch (final JsonProcessingException e) {
+ throw new InvocationFailedException("Failed to serialize
return value for Connector Method '" + methodName + "' on " + this, e);
} catch (final Exception e) {
throw new InvocationFailedException(e);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 890542e703..72214fdc3c 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -16,6 +16,10 @@
*/
package org.apache.nifi.controller.service;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
@@ -104,6 +108,9 @@ import java.util.stream.Collectors;
public class StandardControllerServiceNode extends AbstractComponentNode
implements ControllerServiceNode {
private static final Logger LOG =
LoggerFactory.getLogger(StandardControllerServiceNode.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL);
private static final long INCREMENTAL_VALIDATION_DELAY_MS = 1000;
private static final Duration MAXIMUM_DELAY = Duration.ofMinutes(10);
@@ -895,37 +902,40 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
}
}
- // TODO: Refactor, this is the same between ProcessorNode /
ControllerServiceNode except for getComponentState
@Override
- public Object invokeConnectorMethod(final String methodName, final
Map<String, Object> arguments, final ConfigurationContext configurationContext)
throws InvocationFailedException {
+ public String invokeConnectorMethod(final String methodName, final
Map<String, String> jsonArguments, final ConfigurationContext
configurationContext) throws InvocationFailedException {
final ConfigurableComponent component = getComponent();
try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(getExtensionManager(),
component.getClass(), getIdentifier())) {
final Method implementationMethod =
discoverConnectorMethod(component.getClass(), methodName);
final MethodArgument[] methodArguments =
getConnectorMethodArguments(methodName, implementationMethod, component);
final List<Object> argumentValues = new ArrayList<>();
+
for (final MethodArgument methodArgument : methodArguments) {
- final Object argumentValue =
arguments.get(methodArgument.name());
if (ConfigurationContext.class.equals(methodArgument.type())) {
continue;
}
- if (argumentValue == null && methodArgument.required()) {
+ final String jsonValue =
jsonArguments.get(methodArgument.name());
+ if (jsonValue == null && methodArgument.required()) {
throw new IllegalArgumentException("Cannot invoke
Connector Method '" + methodName + "' on " + this + " because the required
argument '"
+ methodArgument.name()
+ "' was not provided");
}
- if (argumentValue != null &&
!(methodArgument.type().isAssignableFrom(argumentValue.getClass()))) {
- throw new IllegalArgumentException("Cannot invoke
Connector Method '" + methodName + "' on " + this + " because the argument '"
- + methodArgument.name()
+ "' is of type " + argumentValue.getClass().getName() + " defined by " +
argumentValue.getClass().getClassLoader()
- + " but the method
expects type " + methodArgument.type().getName() + " defined by " +
methodArgument.type().getClassLoader());
+ if (jsonValue == null) {
+ argumentValues.add(null);
+ } else {
+ try {
+ final Object argumentValue =
OBJECT_MAPPER.readValue(jsonValue, methodArgument.type());
+ argumentValues.add(argumentValue);
+ } catch (final JsonProcessingException e) {
+ throw new InvocationFailedException("Failed to
deserialize argument '" + methodArgument.name() + "' as type " +
methodArgument.type().getName()
+ + " for Connector
Method '" + methodName + "' on " + this, e);
+ }
}
-
- argumentValues.add(argumentValue);
}
// Inject ConfigurationContext if the method signature supports it
- // TODO: Can we move away from Maps and instead just use
reflection to get the arguments directly?
final Class<?>[] argumentTypes =
implementationMethod.getParameterTypes();
if (argumentTypes.length > 0 &&
ConfigurationContext.class.isAssignableFrom(argumentTypes[0])) {
argumentValues.addFirst(configurationContext);
@@ -936,7 +946,14 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
try {
implementationMethod.setAccessible(true);
- return implementationMethod.invoke(component,
argumentValues.toArray());
+ final Object result = implementationMethod.invoke(component,
argumentValues.toArray());
+ if (result == null) {
+ return null;
+ }
+
+ return OBJECT_MAPPER.writeValueAsString(result);
+ } catch (final JsonProcessingException e) {
+ throw new InvocationFailedException("Failed to serialize
return value for Connector Method '" + methodName + "' on " + this, e);
} catch (final Exception e) {
throw new InvocationFailedException(e);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 001fae6785..fce8eeb21e 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -312,7 +312,7 @@ public abstract class ProcessorNode extends
AbstractComponentNode implements Con
public abstract void migrateConfiguration(Map<String, String>
originalPropertyValues, ControllerServiceFactory serviceFactory);
- public abstract Object invokeConnectorMethod(String methodName,
Map<String, Object> arguments, ProcessContext processContext) throws
InvocationFailedException;
+ public abstract String invokeConnectorMethod(String methodName,
Map<String, String> jsonArguments, ProcessContext processContext) throws
InvocationFailedException;
public abstract List<ConnectorMethod> getConnectorMethods();
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index cc7e2319d9..79dbee5ede 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -263,7 +263,7 @@ public interface ControllerServiceNode extends
ComponentNode, VersionedComponent
void migrateConfiguration(Map<String, String> originalPropertyValues,
ControllerServiceFactory serviceFactory);
- Object invokeConnectorMethod(String methodName, Map<String, Object>
arguments, ConfigurationContext configurationContext) throws
InvocationFailedException;
+ String invokeConnectorMethod(String methodName, Map<String, String>
jsonArguments, ConfigurationContext configurationContext) throws
InvocationFailedException;
List<ConnectorMethod> getConnectorMethods();
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java
index 1ec92cad5c..59cbb92516 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java
@@ -17,6 +17,10 @@
package org.apache.nifi.components.connector.facades.standalone;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.asset.AssetManager;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ValidationContext;
@@ -39,10 +43,15 @@ import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterLookup;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class StandaloneControllerServiceFacade implements
ControllerServiceFacade {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL);
+
private final ControllerServiceNode controllerServiceNode;
private final VersionedControllerService versionedControllerService;
private final ParameterContext parameterContext;
@@ -161,12 +170,49 @@ public class StandaloneControllerServiceFacade implements
ControllerServiceFacad
@Override
public Object invokeConnectorMethod(final String methodName, final
Map<String, Object> arguments) throws InvocationFailedException {
+ final Map<String, String> jsonArguments =
serializeArgumentsToJson(arguments);
final ConfigurationContext configurationContext =
componentContextProvider.createConfigurationContext(controllerServiceNode,
parameterContext);
- return controllerServiceNode.invokeConnectorMethod(methodName,
arguments, configurationContext);
+ final String jsonResult =
controllerServiceNode.invokeConnectorMethod(methodName, jsonArguments,
configurationContext);
+ if (jsonResult == null) {
+ return null;
+ }
+
+ try {
+ return OBJECT_MAPPER.readValue(jsonResult, Object.class);
+ } catch (final JsonProcessingException e) {
+ throw new InvocationFailedException("Failed to deserialize return
value from Connector Method '" + methodName + "'", e);
+ }
}
@Override
public <T> T invokeConnectorMethod(final String methodName, final
Map<String, Object> arguments, final Class<T> returnType) throws
InvocationFailedException {
- return returnType.cast(invokeConnectorMethod(methodName, arguments));
+ final Map<String, String> jsonArguments =
serializeArgumentsToJson(arguments);
+ final ConfigurationContext configurationContext =
componentContextProvider.createConfigurationContext(controllerServiceNode,
parameterContext);
+ final String jsonResult =
controllerServiceNode.invokeConnectorMethod(methodName, jsonArguments,
configurationContext);
+ if (jsonResult == null) {
+ return null;
+ }
+
+ try {
+ return OBJECT_MAPPER.readValue(jsonResult, returnType);
+ } catch (final JsonProcessingException e) {
+ throw new InvocationFailedException("Failed to deserialize return
value from Connector Method '" + methodName + "'", e);
+ }
+ }
+
+ private Map<String, String> serializeArgumentsToJson(final Map<String,
Object> arguments) throws InvocationFailedException {
+ final Map<String, String> jsonArguments = new HashMap<>();
+ for (final Map.Entry<String, Object> entry : arguments.entrySet()) {
+ if (entry.getValue() == null) {
+ jsonArguments.put(entry.getKey(), null);
+ } else {
+ try {
+ jsonArguments.put(entry.getKey(),
OBJECT_MAPPER.writeValueAsString(entry.getValue()));
+ } catch (final JsonProcessingException e) {
+ throw new InvocationFailedException("Failed to serialize
argument '" + entry.getKey() + "' to JSON", e);
+ }
+ }
+ }
+ return jsonArguments;
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java
index 5fe3f9ef83..707aa8b67c 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java
@@ -17,6 +17,10 @@
package org.apache.nifi.components.connector.facades.standalone;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.asset.AssetManager;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ValidationContext;
@@ -41,10 +45,15 @@ import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.processor.ProcessContext;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class StandaloneProcessorFacade implements ProcessorFacade {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL);
+
private final ProcessorNode processorNode;
private final VersionedProcessor versionedProcessor;
private final ParameterContext parameterContext;
@@ -169,12 +178,51 @@ public class StandaloneProcessorFacade implements
ProcessorFacade {
@Override
public Object invokeConnectorMethod(final String methodName, final
Map<String, Object> arguments) throws InvocationFailedException {
+ final Map<String, String> jsonArguments =
serializeArgumentsToJson(arguments);
final ProcessContext processContext =
componentContextProvider.createProcessContext(processorNode, parameterContext);
- return processorNode.invokeConnectorMethod(methodName, arguments,
processContext);
+ final String jsonResult =
processorNode.invokeConnectorMethod(methodName, jsonArguments, processContext);
+ if (jsonResult == null) {
+ return null;
+ }
+
+ try {
+ return OBJECT_MAPPER.readValue(jsonResult, Object.class);
+ } catch (final JsonProcessingException e) {
+ throw new InvocationFailedException("Failed to deserialize return
value from Connector Method '" + methodName + "'", e);
+ }
}
@Override
public <T> T invokeConnectorMethod(final String methodName, final
Map<String, Object> arguments, final Class<T> returnType) throws
InvocationFailedException {
- return returnType.cast(invokeConnectorMethod(methodName, arguments));
+ final Map<String, String> jsonArguments =
serializeArgumentsToJson(arguments);
+ final ProcessContext processContext =
componentContextProvider.createProcessContext(processorNode, parameterContext);
+ final String jsonResult =
processorNode.invokeConnectorMethod(methodName, jsonArguments, processContext);
+ if (jsonResult == null) {
+ return null;
+ }
+
+ try {
+ return OBJECT_MAPPER.readValue(jsonResult, returnType);
+ } catch (final JsonProcessingException e) {
+ throw new InvocationFailedException("Failed to deserialize return
value from Connector Method '" + methodName + "'", e);
+ }
+ }
+
+ private Map<String, String> serializeArgumentsToJson(final Map<String,
Object> arguments) throws InvocationFailedException {
+ final Map<String, String> jsonArguments = new HashMap<>();
+
+ for (final Map.Entry<String, Object> entry : arguments.entrySet()) {
+ if (entry.getValue() == null) {
+ jsonArguments.put(entry.getKey(), null);
+ } else {
+ try {
+ jsonArguments.put(entry.getKey(),
OBJECT_MAPPER.writeValueAsString(entry.getValue()));
+ } catch (final JsonProcessingException e) {
+ throw new InvocationFailedException("Failed to serialize
argument '" + entry.getKey() + "' to JSON", e);
+ }
+ }
+ }
+
+ return jsonArguments;
}
}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java
new file mode 100644
index 0000000000..db43577038
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.connectors.tests.system;
+
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.connector.AbstractConnector;
+import org.apache.nifi.components.connector.ConfigurationStep;
+import org.apache.nifi.components.connector.ConnectorPropertyDescriptor;
+import org.apache.nifi.components.connector.ConnectorPropertyGroup;
+import org.apache.nifi.components.connector.FlowUpdateException;
+import org.apache.nifi.components.connector.InvocationFailedException;
+import org.apache.nifi.components.connector.PropertyType;
+import org.apache.nifi.components.connector.StepConfigurationContext;
+import org.apache.nifi.components.connector.components.FlowContext;
+import org.apache.nifi.components.connector.components.ProcessorFacade;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A test connector that invokes a ConnectorMethod on a CalculateProcessor
using its own POJO types.
+ * This connector is used to test JSON marshalling of complex objects across
ClassLoader boundaries.
+ */
+public class CalculateConnector extends AbstractConnector {
+
+ /**
+ * A POJO representing a calculation request. This is intentionally a
different class than
+ * the Processor's record type to test cross-ClassLoader JSON marshalling.
+ */
+ public static class Calculation {
+ private int operand1;
+ private int operand2;
+ private String operation;
+
+ public Calculation() {
+ }
+
+ public Calculation(final int operand1, final int operand2, final
String operation) {
+ this.operand1 = operand1;
+ this.operand2 = operand2;
+ this.operation = operation;
+ }
+
+ public int getOperand1() {
+ return operand1;
+ }
+
+ public void setOperand1(final int operand1) {
+ this.operand1 = operand1;
+ }
+
+ public int getOperand2() {
+ return operand2;
+ }
+
+ public void setOperand2(final int operand2) {
+ this.operand2 = operand2;
+ }
+
+ public String getOperation() {
+ return operation;
+ }
+
+ public void setOperation(final String operation) {
+ this.operation = operation;
+ }
+ }
+
+ /**
+ * A POJO representing the result of a calculation. This is intentionally
a different class than
+ * the Processor's record type to test cross-ClassLoader JSON marshalling.
+ */
+ public static class CalculatedResult {
+ private Calculation calculation;
+ private int result;
+
+ public CalculatedResult() {
+ }
+
+ public Calculation getCalculation() {
+ return calculation;
+ }
+
+ public void setCalculation(final Calculation calculation) {
+ this.calculation = calculation;
+ }
+
+ public int getResult() {
+ return result;
+ }
+
+ public void setResult(final int result) {
+ this.result = result;
+ }
+ }
+
+ private static final ConnectorPropertyDescriptor OPERAND_1 = new
ConnectorPropertyDescriptor.Builder()
+ .name("Operand 1")
+ .description("The first operand for the calculation")
+ .type(PropertyType.STRING)
+ .required(true)
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .build();
+
+ private static final ConnectorPropertyDescriptor OPERAND_2 = new
ConnectorPropertyDescriptor.Builder()
+ .name("Operand 2")
+ .description("The second operand for the calculation")
+ .type(PropertyType.STRING)
+ .required(true)
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .build();
+
+ private static final ConnectorPropertyDescriptor OPERATION = new
ConnectorPropertyDescriptor.Builder()
+ .name("Operation")
+ .description("The operation to perform (ADD, SUBTRACT, MULTIPLY,
DIVIDE)")
+ .type(PropertyType.STRING)
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ private static final ConnectorPropertyDescriptor OUTPUT_FILE = new
ConnectorPropertyDescriptor.Builder()
+ .name("Output File")
+ .description("The file to write the calculation result to")
+ .type(PropertyType.STRING)
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ private static final ConnectorPropertyGroup CALCULATION_GROUP = new
ConnectorPropertyGroup.Builder()
+ .name("Calculation Configuration")
+ .description("Configuration properties for the calculation")
+ .properties(List.of(OPERAND_1, OPERAND_2, OPERATION, OUTPUT_FILE))
+ .build();
+
+ private static final ConfigurationStep CALCULATION_STEP = new
ConfigurationStep.Builder()
+ .name("Calculation")
+ .description("Configure the calculation parameters")
+ .propertyGroups(List.of(CALCULATION_GROUP))
+ .build();
+
+ @Override
+ public VersionedExternalFlow getInitialFlow() {
+ final VersionedProcessGroup group = new VersionedProcessGroup();
+ group.setName("Calculate Flow");
+ group.setIdentifier("calculate-flow-id");
+
+ final Bundle bundle = new Bundle();
+ bundle.setGroup("org.apache.nifi");
+ bundle.setArtifact("nifi-system-test-extensions-nar");
+ bundle.setVersion("2.7.0-SNAPSHOT");
+
+ final VersionedProcessor processor = new VersionedProcessor();
+ processor.setIdentifier("calculate-processor-id");
+ processor.setName("Calculate Processor");
+ processor.setType("org.apache.nifi.processors.tests.system.Calculate");
+ processor.setBundle(bundle);
+ processor.setProperties(Map.of());
+ processor.setPropertyDescriptors(Map.of());
+ processor.setScheduledState(ScheduledState.ENABLED);
+ processor.setBulletinLevel("WARN");
+ processor.setPosition(new Position(0D, 0D));
+ processor.setPenaltyDuration("30 sec");
+ processor.setAutoTerminatedRelationships(Set.of());
+ processor.setExecutionNode("ALL");
+ processor.setGroupIdentifier(group.getIdentifier());
+ processor.setConcurrentlySchedulableTaskCount(1);
+ processor.setRunDurationMillis(0L);
+ processor.setSchedulingStrategy("TIMER_DRIVEN");
+ processor.setYieldDuration("1 sec");
+ processor.setSchedulingPeriod("0 sec");
+ processor.setStyle(Map.of());
+ group.setProcessors(Set.of(processor));
+
+ final VersionedExternalFlow flow = new VersionedExternalFlow();
+ flow.setFlowContents(group);
+ flow.setParameterContexts(Map.of());
+ return flow;
+ }
+
+ @Override
+ public List<ConfigurationStep> getConfigurationSteps() {
+ return List.of(CALCULATION_STEP);
+ }
+
+ @Override
+ protected void onStepConfigured(final String stepName, final FlowContext
workingContext) {
+ }
+
+ @Override
+ public void applyUpdate(final FlowContext workingContext, final
FlowContext activeContext) throws FlowUpdateException {
+ final StepConfigurationContext stepContext =
workingContext.getConfigurationContext().scopedToStep(CALCULATION_STEP);
+ final int operand1 = stepContext.getProperty(OPERAND_1).asInteger();
+ final int operand2 = stepContext.getProperty(OPERAND_2).asInteger();
+ final String operation = stepContext.getProperty(OPERATION).getValue();
+ final String outputFile =
stepContext.getProperty(OUTPUT_FILE).getValue();
+
+ final ProcessorFacade processorFacade =
workingContext.getRootGroup().getProcessors().stream()
+ .filter(p -> p.getDefinition().getType().endsWith("Calculate"))
+ .findFirst()
+ .orElseThrow(() -> new FlowUpdateException("CalculateProcessor not
found in flow"));
+
+ final Calculation calculation = new Calculation(operand1, operand2,
operation);
+ final CalculatedResult result;
+ try {
+ result = processorFacade.invokeConnectorMethod("calculate",
Map.of("calculation", calculation), CalculatedResult.class);
+ } catch (final InvocationFailedException e) {
+ throw new FlowUpdateException("Failed to invoke calculate method",
e);
+ }
+
+ final File file = new File(outputFile);
+ try (final FileWriter writer = new FileWriter(file)) {
+
writer.write(String.valueOf(result.getCalculation().getOperand1()));
+ writer.write("\n");
+
writer.write(String.valueOf(result.getCalculation().getOperand2()));
+ writer.write("\n");
+ writer.write(result.getCalculation().getOperation());
+ writer.write("\n");
+ writer.write(String.valueOf(result.getResult()));
+ } catch (final IOException e) {
+ throw new FlowUpdateException("Failed to write result to file", e);
+ }
+
+ getLogger().info("Calculation result: {} {} {} = {}", operand1,
operation, operand2, result.getResult());
+ }
+
+ @Override
+ public List<ConfigVerificationResult> verifyConfigurationStep(final String
stepName, final Map<String, String> propertyValueOverrides, final FlowContext
flowContext) {
+ return List.of();
+ }
+}
+
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/Calculate.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/Calculate.java
new file mode 100644
index 0000000000..d8bc65b8a5
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/Calculate.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.components.connector.components.ComponentState;
+import org.apache.nifi.components.connector.components.ConnectorMethod;
+import org.apache.nifi.components.connector.components.MethodArgument;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+/**
+ * A test processor that exposes a ConnectorMethod for performing calculations.
+ * This processor is used to test JSON marshalling of complex objects across
ClassLoader boundaries.
+ */
+public class Calculate extends AbstractProcessor {
+
+ @ConnectorMethod(
+ name = "calculate",
+ description = "Performs a calculation based on the provided
Calculation object",
+ allowedStates = ComponentState.STOPPED,
+ arguments = {
+ @MethodArgument(name = "calculation", type = Calculation.class,
description = "The calculation to perform", required = true)
+ }
+ )
+ public Object calculate(final Calculation calculation) {
+ final int result = switch (calculation.operation()) {
+ case "ADD" -> calculation.operand1() + calculation.operand2();
+ case "SUBTRACT" -> calculation.operand1() - calculation.operand2();
+ case "MULTIPLY" -> calculation.operand1() * calculation.operand2();
+ case "DIVIDE" -> calculation.operand1() / calculation.operand2();
+ default -> throw new IllegalArgumentException("Unknown operation:
" + calculation.operation());
+ };
+
+ return new CalculatedResult(calculation, result);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext processContext, final
ProcessSession processSession) throws ProcessException {
+ }
+
+ /**
+ * A record representing a calculation request with two operands and an
operation.
+ */
+ public record Calculation(int operand1, int operand2, String operation) {
+ }
+
+ /**
+ * A record representing the result of a calculation, including the
original calculation and the result.
+ */
+ public record CalculatedResult(Calculation calculation, int result) {
+ }
+
+}
+
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
index a54f75d132..34fe7c90ea 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
@@ -17,3 +17,4 @@ org.apache.nifi.connectors.tests.system.NopConnector
org.apache.nifi.connectors.tests.system.AssetConnector
org.apache.nifi.connectors.tests.system.DataQueuingConnector
org.apache.nifi.connectors.tests.system.NestedProcessGroupConnector
+org.apache.nifi.connectors.tests.system.CalculateConnector
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index d0a01b38b5..93e561e6d8 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -62,3 +62,4 @@
org.apache.nifi.processors.tests.system.WriteFlowFileCountToFile
org.apache.nifi.processors.tests.system.WriteLifecycleEvents
org.apache.nifi.processors.tests.system.WriteToFile
org.apache.nifi.processors.tests.system.YieldSource
+org.apache.nifi.processors.tests.system.Calculate
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorMethodMarshallingIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorMethodMarshallingIT.java
new file mode 100644
index 0000000000..e57bf852d2
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorMethodMarshallingIT.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.connectors;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * System tests that verify JSON marshalling of complex objects across
ClassLoader boundaries
+ * when invoking ConnectorMethods on Processors.
+ */
+public class ConnectorMethodMarshallingIT extends NiFiSystemIT {
+
+ @Test
+ public void testComplexObjectMarshalling() throws NiFiClientException,
IOException, InterruptedException {
+ final File outputFile = new File("target/calculate-result.txt");
+ if (outputFile.exists()) {
+ assertTrue(outputFile.delete(), "Failed to delete existing output
file");
+ }
+
+ final ConnectorEntity connector =
getClientUtil().createConnector("CalculateConnector");
+ assertNotNull(connector);
+
+ getClientUtil().configureConnector(connector, "Calculation", Map.of(
+ "Operand 1", "10",
+ "Operand 2", "5",
+ "Operation", "ADD",
+ "Output File", outputFile.getAbsolutePath()
+ ));
+
+ getClientUtil().applyConnectorUpdate(connector);
+ getClientUtil().waitForValidConnector(connector.getId());
+
+ assertTrue(outputFile.exists(), "Output file was not created");
+ final String fileContents = Files.readString(outputFile.toPath(),
StandardCharsets.UTF_8);
+ final String[] lines = fileContents.split("\n");
+ assertEquals(4, lines.length, "Output file should contain 4 lines");
+ assertEquals("10", lines[0], "First line should be operand1");
+ assertEquals("5", lines[1], "Second line should be operand2");
+ assertEquals("ADD", lines[2], "Third line should be the operation");
+ assertEquals("15", lines[3], "Fourth line should be the result (10 + 5
= 15)");
+ }
+}
+