This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch NIFI-15258 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 58735ea30efed9da4b9d0b156aabe156ecff7ca3 Author: Mark Payne <[email protected]> AuthorDate: Thu Jan 15 11:09:17 2026 -0500 NIFI-15446: When invoking ConnectorMethod, make sure to serialize/des… (#10750) * NIFI-15446: When invoking ConnectorMethod, make sure to serialize/deserialize the arguments and return value * NIFI-15446: Addressed review feedback --- .../nifi/controller/StandardProcessorNode.java | 40 +++- .../service/StandardControllerServiceNode.java | 41 +++- .../org/apache/nifi/controller/ProcessorNode.java | 2 +- .../controller/service/ControllerServiceNode.java | 2 +- .../StandaloneControllerServiceFacade.java | 50 +++- .../standalone/StandaloneProcessorFacade.java | 52 ++++- .../tests/system/CalculateConnector.java | 257 +++++++++++++++++++++ .../nifi/processors/tests/system/Calculate.java | 71 ++++++ .../org.apache.nifi.components.connector.Connector | 1 + .../services/org.apache.nifi.processor.Processor | 1 + .../connectors/ConnectorMethodMarshallingIT.java | 71 ++++++ 11 files changed, 559 insertions(+), 29 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 3823428856..7570cfe9e1 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -16,6 +16,10 @@ */ package org.apache.nifi.controller; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.Restricted; @@ -140,7 +144,9 @@ import java.util.stream.Stream; public class StandardProcessorNode extends ProcessorNode implements Connectable { private static final Logger LOG = LoggerFactory.getLogger(StandardProcessorNode.class); - + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL); public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; public static final String DEFAULT_YIELD_PERIOD = "1 sec"; @@ -1941,31 +1947,36 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override - public Object invokeConnectorMethod(final String methodName, final Map<String, Object> arguments, final ProcessContext processContext) throws InvocationFailedException { + public String invokeConnectorMethod(final String methodName, final Map<String, String> jsonArguments, final ProcessContext processContext) throws InvocationFailedException { final ConfigurableComponent component = getComponent(); try (final NarCloseable ignored = NarCloseable.withComponentNarLoader(getExtensionManager(), component.getClass(), getIdentifier())) { final Method implementationMethod = discoverConnectorMethod(component.getClass(), methodName); final MethodArgument[] methodArguments = getConnectorMethodArguments(methodName, implementationMethod, component); final List<Object> argumentValues = new ArrayList<>(); + for (final MethodArgument methodArgument : methodArguments) { - final Object argumentValue = arguments.get(methodArgument.name()); if (ProcessContext.class.equals(methodArgument.type())) { continue; } - if (argumentValue == null && methodArgument.required()) { + final String jsonValue = jsonArguments.get(methodArgument.name()); + if (jsonValue == null && methodArgument.required()) { throw new IllegalArgumentException("Cannot invoke Connector Method '" + methodName + "' on " + this + " because the required argument '" + methodArgument.name() + "' was not provided"); } - if (argumentValue != null && !(methodArgument.type().isAssignableFrom(argumentValue.getClass()))) { - throw new IllegalArgumentException("Cannot invoke Connector Method '" + methodName + "' on " + this + " because the argument '" - + methodArgument.name() + "' is of type " + argumentValue.getClass().getName() + " defined by " + argumentValue.getClass().getClassLoader() - + " but the method expects type " + methodArgument.type().getName() + " defined by " + methodArgument.type().getClassLoader()); + if (jsonValue == null) { + argumentValues.add(null); + } else { + try { + final Object argumentValue = OBJECT_MAPPER.readValue(jsonValue, methodArgument.type()); + argumentValues.add(argumentValue); + } catch (final JsonProcessingException e) { + throw new InvocationFailedException("Failed to deserialize argument '" + methodArgument.name() + "' as type " + methodArgument.type().getName() + + " for Connector Method '" + methodName + "' on " + this, e); + } } - - argumentValues.add(argumentValue); } // Inject ProcessContext if the method signature supports it @@ -1979,7 +1990,14 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable try { implementationMethod.setAccessible(true); - return implementationMethod.invoke(component, argumentValues.toArray()); + final Object result = implementationMethod.invoke(component, argumentValues.toArray()); + if (result == null) { + return null; + } + + return OBJECT_MAPPER.writeValueAsString(result); + } catch (final JsonProcessingException e) { + throw new InvocationFailedException("Failed to serialize return value for Connector Method '" + methodName + "' on " + this, e); } catch (final Exception e) { throw new InvocationFailedException(e); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index 57efd27d08..6bcc7a2c4d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -16,6 +16,10 @@ */ package org.apache.nifi.controller.service; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.documentation.DeprecationNotice; @@ -104,6 +108,9 @@ import java.util.stream.Collectors; public class StandardControllerServiceNode extends AbstractComponentNode implements ControllerServiceNode { private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL); private static final long INCREMENTAL_VALIDATION_DELAY_MS = 1000; private static final Duration MAXIMUM_DELAY = Duration.ofMinutes(10); @@ -904,37 +911,40 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme } } - // TODO: Refactor, this is the same between ProcessorNode / ControllerServiceNode except for getComponentState @Override - public Object invokeConnectorMethod(final String methodName, final Map<String, Object> arguments, final ConfigurationContext configurationContext) throws InvocationFailedException { + public String invokeConnectorMethod(final String methodName, final Map<String, String> jsonArguments, final ConfigurationContext configurationContext) throws InvocationFailedException { final ConfigurableComponent component = getComponent(); try (final NarCloseable ignored = NarCloseable.withComponentNarLoader(getExtensionManager(), component.getClass(), getIdentifier())) { final Method implementationMethod = discoverConnectorMethod(component.getClass(), methodName); final MethodArgument[] methodArguments = getConnectorMethodArguments(methodName, implementationMethod, component); final List<Object> argumentValues = new ArrayList<>(); + for (final MethodArgument methodArgument : methodArguments) { - final Object argumentValue = arguments.get(methodArgument.name()); if (ConfigurationContext.class.equals(methodArgument.type())) { continue; } - if (argumentValue == null && methodArgument.required()) { + final String jsonValue = jsonArguments.get(methodArgument.name()); + if (jsonValue == null && methodArgument.required()) { throw new IllegalArgumentException("Cannot invoke Connector Method '" + methodName + "' on " + this + " because the required argument '" + methodArgument.name() + "' was not provided"); } - if (argumentValue != null && !(methodArgument.type().isAssignableFrom(argumentValue.getClass()))) { - throw new IllegalArgumentException("Cannot invoke Connector Method '" + methodName + "' on " + this + " because the argument '" - + methodArgument.name() + "' is of type " + argumentValue.getClass().getName() + " defined by " + argumentValue.getClass().getClassLoader() - + " but the method expects type " + methodArgument.type().getName() + " defined by " + methodArgument.type().getClassLoader()); + if (jsonValue == null) { + argumentValues.add(null); + } else { + try { + final Object argumentValue = OBJECT_MAPPER.readValue(jsonValue, methodArgument.type()); + argumentValues.add(argumentValue); + } catch (final JsonProcessingException e) { + throw new InvocationFailedException("Failed to deserialize argument '" + methodArgument.name() + "' as type " + methodArgument.type().getName() + + " for Connector Method '" + methodName + "' on " + this, e); + } } - - argumentValues.add(argumentValue); } // Inject ConfigurationContext if the method signature supports it - // TODO: Can we move away from Maps and instead just use reflection to get the arguments directly? final Class<?>[] argumentTypes = implementationMethod.getParameterTypes(); if (argumentTypes.length > 0 && ConfigurationContext.class.isAssignableFrom(argumentTypes[0])) { argumentValues.addFirst(configurationContext); @@ -945,7 +955,14 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme try { implementationMethod.setAccessible(true); - return implementationMethod.invoke(component, argumentValues.toArray()); + final Object result = implementationMethod.invoke(component, argumentValues.toArray()); + if (result == null) { + return null; + } + + return OBJECT_MAPPER.writeValueAsString(result); + } catch (final JsonProcessingException e) { + throw new InvocationFailedException("Failed to serialize return value for Connector Method '" + methodName + "' on " + this, e); } catch (final Exception e) { throw new InvocationFailedException(e); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index 001fae6785..fce8eeb21e 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -312,7 +312,7 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con public abstract void migrateConfiguration(Map<String, String> originalPropertyValues, ControllerServiceFactory serviceFactory); - public abstract Object invokeConnectorMethod(String methodName, Map<String, Object> arguments, ProcessContext processContext) throws InvocationFailedException; + public abstract String invokeConnectorMethod(String methodName, Map<String, String> jsonArguments, ProcessContext processContext) throws InvocationFailedException; public abstract List<ConnectorMethod> getConnectorMethods(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java index 2b2f78723e..16b8b7e8d3 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -263,7 +263,7 @@ public interface ControllerServiceNode extends ComponentNode, VersionedComponent void migrateConfiguration(Map<String, String> originalPropertyValues, ControllerServiceFactory serviceFactory); - Object invokeConnectorMethod(String methodName, Map<String, Object> arguments, ConfigurationContext configurationContext) throws InvocationFailedException; + String invokeConnectorMethod(String methodName, Map<String, String> jsonArguments, ConfigurationContext configurationContext) throws InvocationFailedException; List<ConnectorMethod> getConnectorMethods(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java index 1ec92cad5c..59cbb92516 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java @@ -17,6 +17,10 @@ package org.apache.nifi.components.connector.facades.standalone; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.nifi.asset.AssetManager; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ValidationContext; @@ -39,10 +43,15 @@ import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.parameter.ParameterLookup; +import java.util.HashMap; import java.util.List; import java.util.Map; public class StandaloneControllerServiceFacade implements ControllerServiceFacade { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL); + private final ControllerServiceNode controllerServiceNode; private final VersionedControllerService versionedControllerService; private final ParameterContext parameterContext; @@ -161,12 +170,49 @@ public class StandaloneControllerServiceFacade implements ControllerServiceFacad @Override public Object invokeConnectorMethod(final String methodName, final Map<String, Object> arguments) throws InvocationFailedException { + final Map<String, String> jsonArguments = serializeArgumentsToJson(arguments); final ConfigurationContext configurationContext = componentContextProvider.createConfigurationContext(controllerServiceNode, parameterContext); - return controllerServiceNode.invokeConnectorMethod(methodName, arguments, configurationContext); + final String jsonResult = controllerServiceNode.invokeConnectorMethod(methodName, jsonArguments, configurationContext); + if (jsonResult == null) { + return null; + } + + try { + return OBJECT_MAPPER.readValue(jsonResult, Object.class); + } catch (final JsonProcessingException e) { + throw new InvocationFailedException("Failed to deserialize return value from Connector Method '" + methodName + "'", e); + } } @Override public <T> T invokeConnectorMethod(final String methodName, final Map<String, Object> arguments, final Class<T> returnType) throws InvocationFailedException { - return returnType.cast(invokeConnectorMethod(methodName, arguments)); + final Map<String, String> jsonArguments = serializeArgumentsToJson(arguments); + final ConfigurationContext configurationContext = componentContextProvider.createConfigurationContext(controllerServiceNode, parameterContext); + final String jsonResult = controllerServiceNode.invokeConnectorMethod(methodName, jsonArguments, configurationContext); + if (jsonResult == null) { + return null; + } + + try { + return OBJECT_MAPPER.readValue(jsonResult, returnType); + } catch (final JsonProcessingException e) { + throw new InvocationFailedException("Failed to deserialize return value from Connector Method '" + methodName + "'", e); + } + } + + private Map<String, String> serializeArgumentsToJson(final Map<String, Object> arguments) throws InvocationFailedException { + final Map<String, String> jsonArguments = new HashMap<>(); + for (final Map.Entry<String, Object> entry : arguments.entrySet()) { + if (entry.getValue() == null) { + jsonArguments.put(entry.getKey(), null); + } else { + try { + jsonArguments.put(entry.getKey(), OBJECT_MAPPER.writeValueAsString(entry.getValue())); + } catch (final JsonProcessingException e) { + throw new InvocationFailedException("Failed to serialize argument '" + entry.getKey() + "' to JSON", e); + } + } + } + return jsonArguments; } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java index 5fe3f9ef83..707aa8b67c 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java @@ -17,6 +17,10 @@ package org.apache.nifi.components.connector.facades.standalone; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.nifi.asset.AssetManager; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ValidationContext; @@ -41,10 +45,15 @@ import org.apache.nifi.parameter.ParameterLookup; import org.apache.nifi.processor.ProcessContext; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; public class StandaloneProcessorFacade implements ProcessorFacade { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL); + private final ProcessorNode processorNode; private final VersionedProcessor versionedProcessor; private final ParameterContext parameterContext; @@ -169,12 +178,51 @@ public class StandaloneProcessorFacade implements ProcessorFacade { @Override public Object invokeConnectorMethod(final String methodName, final Map<String, Object> arguments) throws InvocationFailedException { + final Map<String, String> jsonArguments = serializeArgumentsToJson(arguments); final ProcessContext processContext = componentContextProvider.createProcessContext(processorNode, parameterContext); - return processorNode.invokeConnectorMethod(methodName, arguments, processContext); + final String jsonResult = processorNode.invokeConnectorMethod(methodName, jsonArguments, processContext); + if (jsonResult == null) { + return null; + } + + try { + return OBJECT_MAPPER.readValue(jsonResult, Object.class); + } catch (final JsonProcessingException e) { + throw new InvocationFailedException("Failed to deserialize return value from Connector Method '" + methodName + "'", e); + } } @Override public <T> T invokeConnectorMethod(final String methodName, final Map<String, Object> arguments, final Class<T> returnType) throws InvocationFailedException { - return returnType.cast(invokeConnectorMethod(methodName, arguments)); + final Map<String, String> jsonArguments = serializeArgumentsToJson(arguments); + final ProcessContext processContext = componentContextProvider.createProcessContext(processorNode, parameterContext); + final String jsonResult = processorNode.invokeConnectorMethod(methodName, jsonArguments, processContext); + if (jsonResult == null) { + return null; + } + + try { + return OBJECT_MAPPER.readValue(jsonResult, returnType); + } catch (final JsonProcessingException e) { + throw new InvocationFailedException("Failed to deserialize return value from Connector Method '" + methodName + "'", e); + } + } + + private Map<String, String> serializeArgumentsToJson(final Map<String, Object> arguments) throws InvocationFailedException { + final Map<String, String> jsonArguments = new HashMap<>(); + + for (final Map.Entry<String, Object> entry : arguments.entrySet()) { + if (entry.getValue() == null) { + jsonArguments.put(entry.getKey(), null); + } else { + try { + jsonArguments.put(entry.getKey(), OBJECT_MAPPER.writeValueAsString(entry.getValue())); + } catch (final JsonProcessingException e) { + throw new InvocationFailedException("Failed to serialize argument '" + entry.getKey() + "' to JSON", e); + } + } + } + + return jsonArguments; } } diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java new file mode 100644 index 0000000000..db43577038 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.connectors.tests.system; + +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.connector.AbstractConnector; +import org.apache.nifi.components.connector.ConfigurationStep; +import org.apache.nifi.components.connector.ConnectorPropertyDescriptor; +import org.apache.nifi.components.connector.ConnectorPropertyGroup; +import org.apache.nifi.components.connector.FlowUpdateException; +import org.apache.nifi.components.connector.InvocationFailedException; +import org.apache.nifi.components.connector.PropertyType; +import org.apache.nifi.components.connector.StepConfigurationContext; +import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.components.ProcessorFacade; +import org.apache.nifi.flow.Bundle; +import org.apache.nifi.flow.Position; +import org.apache.nifi.flow.ScheduledState; +import org.apache.nifi.flow.VersionedExternalFlow; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedProcessor; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * A test connector that invokes a ConnectorMethod on a CalculateProcessor using its own POJO types. + * This connector is used to test JSON marshalling of complex objects across ClassLoader boundaries. + */ +public class CalculateConnector extends AbstractConnector { + + /** + * A POJO representing a calculation request. This is intentionally a different class than + * the Processor's record type to test cross-ClassLoader JSON marshalling. + */ + public static class Calculation { + private int operand1; + private int operand2; + private String operation; + + public Calculation() { + } + + public Calculation(final int operand1, final int operand2, final String operation) { + this.operand1 = operand1; + this.operand2 = operand2; + this.operation = operation; + } + + public int getOperand1() { + return operand1; + } + + public void setOperand1(final int operand1) { + this.operand1 = operand1; + } + + public int getOperand2() { + return operand2; + } + + public void setOperand2(final int operand2) { + this.operand2 = operand2; + } + + public String getOperation() { + return operation; + } + + public void setOperation(final String operation) { + this.operation = operation; + } + } + + /** + * A POJO representing the result of a calculation. This is intentionally a different class than + * the Processor's record type to test cross-ClassLoader JSON marshalling. + */ + public static class CalculatedResult { + private Calculation calculation; + private int result; + + public CalculatedResult() { + } + + public Calculation getCalculation() { + return calculation; + } + + public void setCalculation(final Calculation calculation) { + this.calculation = calculation; + } + + public int getResult() { + return result; + } + + public void setResult(final int result) { + this.result = result; + } + } + + private static final ConnectorPropertyDescriptor OPERAND_1 = new ConnectorPropertyDescriptor.Builder() + .name("Operand 1") + .description("The first operand for the calculation") + .type(PropertyType.STRING) + .required(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + private static final ConnectorPropertyDescriptor OPERAND_2 = new ConnectorPropertyDescriptor.Builder() + .name("Operand 2") + .description("The second operand for the calculation") + .type(PropertyType.STRING) + .required(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + private static final ConnectorPropertyDescriptor OPERATION = new ConnectorPropertyDescriptor.Builder() + .name("Operation") + .description("The operation to perform (ADD, SUBTRACT, MULTIPLY, DIVIDE)") + .type(PropertyType.STRING) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private static final ConnectorPropertyDescriptor OUTPUT_FILE = new ConnectorPropertyDescriptor.Builder() + .name("Output File") + .description("The file to write the calculation result to") + .type(PropertyType.STRING) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private static final ConnectorPropertyGroup CALCULATION_GROUP = new ConnectorPropertyGroup.Builder() + .name("Calculation Configuration") + .description("Configuration properties for the calculation") + .properties(List.of(OPERAND_1, OPERAND_2, OPERATION, OUTPUT_FILE)) + .build(); + + private static final ConfigurationStep CALCULATION_STEP = new ConfigurationStep.Builder() + .name("Calculation") + .description("Configure the calculation parameters") + .propertyGroups(List.of(CALCULATION_GROUP)) + .build(); + + @Override + public VersionedExternalFlow getInitialFlow() { + final VersionedProcessGroup group = new VersionedProcessGroup(); + group.setName("Calculate Flow"); + group.setIdentifier("calculate-flow-id"); + + final Bundle bundle = new Bundle(); + bundle.setGroup("org.apache.nifi"); + bundle.setArtifact("nifi-system-test-extensions-nar"); + bundle.setVersion("2.7.0-SNAPSHOT"); + + final VersionedProcessor processor = new VersionedProcessor(); + processor.setIdentifier("calculate-processor-id"); + processor.setName("Calculate Processor"); + processor.setType("org.apache.nifi.processors.tests.system.Calculate"); + processor.setBundle(bundle); + processor.setProperties(Map.of()); + processor.setPropertyDescriptors(Map.of()); + processor.setScheduledState(ScheduledState.ENABLED); + processor.setBulletinLevel("WARN"); + processor.setPosition(new Position(0D, 0D)); + processor.setPenaltyDuration("30 sec"); + processor.setAutoTerminatedRelationships(Set.of()); + processor.setExecutionNode("ALL"); + processor.setGroupIdentifier(group.getIdentifier()); + processor.setConcurrentlySchedulableTaskCount(1); + processor.setRunDurationMillis(0L); + processor.setSchedulingStrategy("TIMER_DRIVEN"); + processor.setYieldDuration("1 sec"); + processor.setSchedulingPeriod("0 sec"); + processor.setStyle(Map.of()); + group.setProcessors(Set.of(processor)); + + final VersionedExternalFlow flow = new VersionedExternalFlow(); + flow.setFlowContents(group); + flow.setParameterContexts(Map.of()); + return flow; + } + + @Override + public List<ConfigurationStep> getConfigurationSteps() { + return List.of(CALCULATION_STEP); + } + + @Override + protected void onStepConfigured(final String stepName, final FlowContext workingContext) { + } + + @Override + public void applyUpdate(final FlowContext workingContext, final FlowContext activeContext) throws FlowUpdateException { + final StepConfigurationContext stepContext = workingContext.getConfigurationContext().scopedToStep(CALCULATION_STEP); + final int operand1 = stepContext.getProperty(OPERAND_1).asInteger(); + final int operand2 = stepContext.getProperty(OPERAND_2).asInteger(); + final String operation = stepContext.getProperty(OPERATION).getValue(); + final String outputFile = stepContext.getProperty(OUTPUT_FILE).getValue(); + + final ProcessorFacade processorFacade = workingContext.getRootGroup().getProcessors().stream() + .filter(p -> p.getDefinition().getType().endsWith("Calculate")) + .findFirst() + .orElseThrow(() -> new FlowUpdateException("CalculateProcessor not found in flow")); + + final Calculation calculation = new Calculation(operand1, operand2, operation); + final CalculatedResult result; + try { + result = processorFacade.invokeConnectorMethod("calculate", Map.of("calculation", calculation), CalculatedResult.class); + } catch (final InvocationFailedException e) { + throw new FlowUpdateException("Failed to invoke calculate method", e); + } + + final File file = new File(outputFile); + try (final FileWriter writer = new FileWriter(file)) { + writer.write(String.valueOf(result.getCalculation().getOperand1())); + writer.write("\n"); + writer.write(String.valueOf(result.getCalculation().getOperand2())); + writer.write("\n"); + writer.write(result.getCalculation().getOperation()); + writer.write("\n"); + writer.write(String.valueOf(result.getResult())); + } catch (final IOException e) { + throw new FlowUpdateException("Failed to write result to file", e); + } + + getLogger().info("Calculation result: {} {} {} = {}", operand1, operation, operand2, result.getResult()); + } + + @Override + public List<ConfigVerificationResult> verifyConfigurationStep(final String stepName, final Map<String, String> propertyValueOverrides, final FlowContext flowContext) { + return List.of(); + } +} + diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/Calculate.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/Calculate.java new file mode 100644 index 0000000000..d8bc65b8a5 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/Calculate.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.tests.system; + +import org.apache.nifi.components.connector.components.ComponentState; +import org.apache.nifi.components.connector.components.ConnectorMethod; +import org.apache.nifi.components.connector.components.MethodArgument; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +/** + * A test processor that exposes a ConnectorMethod for performing calculations. + * This processor is used to test JSON marshalling of complex objects across ClassLoader boundaries. + */ +public class Calculate extends AbstractProcessor { + + @ConnectorMethod( + name = "calculate", + description = "Performs a calculation based on the provided Calculation object", + allowedStates = ComponentState.STOPPED, + arguments = { + @MethodArgument(name = "calculation", type = Calculation.class, description = "The calculation to perform", required = true) + } + ) + public Object calculate(final Calculation calculation) { + final int result = switch (calculation.operation()) { + case "ADD" -> calculation.operand1() + calculation.operand2(); + case "SUBTRACT" -> calculation.operand1() - calculation.operand2(); + case "MULTIPLY" -> calculation.operand1() * calculation.operand2(); + case "DIVIDE" -> calculation.operand1() / calculation.operand2(); + default -> throw new IllegalArgumentException("Unknown operation: " + calculation.operation()); + }; + + return new CalculatedResult(calculation, result); + } + + @Override + public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) throws ProcessException { + } + + /** + * A record representing a calculation request with two operands and an operation. + */ + public record Calculation(int operand1, int operand2, String operation) { + } + + /** + * A record representing the result of a calculation, including the original calculation and the result. + */ + public record CalculatedResult(Calculation calculation, int result) { + } + +} + diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector index a54f75d132..34fe7c90ea 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector @@ -17,3 +17,4 @@ org.apache.nifi.connectors.tests.system.NopConnector org.apache.nifi.connectors.tests.system.AssetConnector org.apache.nifi.connectors.tests.system.DataQueuingConnector org.apache.nifi.connectors.tests.system.NestedProcessGroupConnector +org.apache.nifi.connectors.tests.system.CalculateConnector diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index d0a01b38b5..93e561e6d8 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -62,3 +62,4 @@ org.apache.nifi.processors.tests.system.WriteFlowFileCountToFile org.apache.nifi.processors.tests.system.WriteLifecycleEvents org.apache.nifi.processors.tests.system.WriteToFile org.apache.nifi.processors.tests.system.YieldSource +org.apache.nifi.processors.tests.system.Calculate diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorMethodMarshallingIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorMethodMarshallingIT.java new file mode 100644 index 0000000000..e57bf852d2 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorMethodMarshallingIT.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.connectors; + +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.client.NiFiClientException; +import org.apache.nifi.web.api.entity.ConnectorEntity; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * System tests that verify JSON marshalling of complex objects across ClassLoader boundaries + * when invoking ConnectorMethods on Processors. + */ +public class ConnectorMethodMarshallingIT extends NiFiSystemIT { + + @Test + public void testComplexObjectMarshalling() throws NiFiClientException, IOException, InterruptedException { + final File outputFile = new File("target/calculate-result.txt"); + if (outputFile.exists()) { + assertTrue(outputFile.delete(), "Failed to delete existing output file"); + } + + final ConnectorEntity connector = getClientUtil().createConnector("CalculateConnector"); + assertNotNull(connector); + + getClientUtil().configureConnector(connector, "Calculation", Map.of( + "Operand 1", "10", + "Operand 2", "5", + "Operation", "ADD", + "Output File", outputFile.getAbsolutePath() + )); + + getClientUtil().applyConnectorUpdate(connector); + getClientUtil().waitForValidConnector(connector.getId()); + + assertTrue(outputFile.exists(), "Output file was not created"); + final String fileContents = Files.readString(outputFile.toPath(), StandardCharsets.UTF_8); + final String[] lines = fileContents.split("\n"); + assertEquals(4, lines.length, "Output file should contain 4 lines"); + assertEquals("10", lines[0], "First line should be operand1"); + assertEquals("5", lines[1], "Second line should be operand2"); + assertEquals("ADD", lines[2], "Third line should be the operation"); + assertEquals("15", lines[3], "Fourth line should be the result (10 + 5 = 15)"); + } +} +
