This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.16 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit dec34d883b146c6575daf7c16213eb650202a2ac Author: tpalfy <[email protected]> AuthorDate: Fri Apr 8 16:08:31 2022 +0200 NIFI-9875 In StandardProcessGroupSynchronizer.updateConnectionDestinations handle special case when output port is moved to a child process group while kept connected. (#5931) --- .../groups/StandardProcessGroupSynchronizer.java | 10 +- .../nifi/integration/versioned/ImportFlowIT.java | 281 +++++++-------------- 2 files changed, 103 insertions(+), 188 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java index cefaf22ada..8c5592dc18 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java @@ -250,7 +250,9 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize // Ensure that we create all Parameter Contexts before updating them. This is necessary in case the proposed incoming dataflow has // parameter contexts that inherit from one another and neither the inheriting nor inherited parameter context exists. - versionedParameterContexts.values().forEach(this::createParameterContextWithoutReferences); + if (versionedParameterContexts != null) { + versionedParameterContexts.values().forEach(this::createParameterContextWithoutReferences); + } updateParameterContext(group, proposed, versionedParameterContexts, context.getComponentIdGenerator()); updateVariableRegistry(group, proposed); @@ -559,7 +561,11 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize // Find the destination of the connection. If the destination doesn't yet exist (because it's part of the proposed Process Group but not yet added), // we will set the destination to a temporary destination. Then, after adding components, we will update the destinations again. Connectable newDestination = getConnectable(group, proposedConnection.getDestination()); - if (newDestination == null) { + if ( + newDestination == null + || + (newDestination.getConnectableType() == ConnectableType.OUTPUT_PORT && !newDestination.getProcessGroup().equals(connection.getProcessGroup())) + ) { final Funnel temporaryDestination = getTemporaryFunnel(connection.getProcessGroup()); LOG.debug("Updated Connection {} to have a temporary destination of {}", connection, temporaryDestination); newDestination = temporaryDestination; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java index 92f711e87a..2c82165e67 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java @@ -27,16 +27,9 @@ import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.StandardSnippet; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.flow.Bundle; -import org.apache.nifi.flow.VersionedConnection; -import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedExternalFlow; -import org.apache.nifi.flow.VersionedExternalFlowMetadata; -import org.apache.nifi.flow.VersionedFunnel; -import org.apache.nifi.flow.VersionedParameter; import org.apache.nifi.flow.VersionedParameterContext; -import org.apache.nifi.flow.VersionedPort; import org.apache.nifi.flow.VersionedProcessGroup; -import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.integration.DirectInjectionExtensionManager; import org.apache.nifi.integration.FrameworkIntegrationTest; @@ -61,12 +54,13 @@ import org.apache.nifi.registry.flow.diff.FlowComparison; import org.apache.nifi.registry.flow.diff.FlowDifference; import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow; import org.apache.nifi.registry.flow.diff.StandardFlowComparator; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; import org.apache.nifi.util.FlowDifferenceFilters; import org.jetbrains.annotations.NotNull; import org.junit.Test; -import java.util.ArrayList; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -85,6 +79,14 @@ import static org.testng.Assert.assertNotNull; import static org.testng.AssertJUnit.assertNull; public class ImportFlowIT extends FrameworkIntegrationTest { + @Override + public void setup() throws IOException { + super.setup(); + + for (ProcessorNode processor : getRootGroup().getProcessors()) { + getRootGroup().removeProcessor(processor); + } + } @Override protected void injectExtensionTypes(final DirectInjectionExtensionManager extensionManager) { @@ -103,7 +105,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { processor.setAutoTerminatedRelationships(Collections.singleton(REL_SUCCESS)); processor.setProperties(Collections.singletonMap(NopServiceReferencingProcessor.SERVICE.getName(), controllerService.getIdentifier())); - final VersionedExternalFlow proposedFlow = createFlowSnapshot(Collections.singletonList(controllerService), Collections.singletonList(processor), null); + final VersionedExternalFlow proposedFlow = createFlowSnapshot(); // Create an Inner Process Group and update it to match the Versioned Flow. final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id"); @@ -147,7 +149,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "password")); // Create a VersionedExternalFlow that contains the processor - final VersionedExternalFlow versionedFlowWithExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), null); + final VersionedExternalFlow versionedFlowWithExplicitValue = createFlowSnapshot(); // Create child group final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id"); @@ -173,7 +175,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { assertEquals(DifferenceType.PROPERTY_PARAMETERIZED, differences.iterator().next().getDifferenceType()); // Create a Versioned Flow that contains the Parameter Reference. - final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), null); + final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(innerGroup); // Ensure no difference between the current configuration and the versioned flow differences = getLocalModifications(innerGroup, versionedFlowWithParameterReference); @@ -193,7 +195,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest { // Create a VersionedExternalFlow that contains the processor final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null); - final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter)); + setParameter(parameter); + final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(); // Create child group final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id"); @@ -225,7 +228,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest { // Create a VersionedExternalFlow that contains the processor final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null); - final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter)); + setParameter(parameter); + final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(); // Create child group final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id"); @@ -255,13 +259,12 @@ public class ImportFlowIT extends FrameworkIntegrationTest { // Create a VersionedExternalFlow that contains the processor final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null); - final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), - Collections.singletonList(initialProcessor), Collections.singleton(parameter)); - + setParameter(parameter); + final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(); // Update processor to have an explicit value for the second version of the flow. initialProcessor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "secret-value")); - final VersionedExternalFlow versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null); + final VersionedExternalFlow versionedFlowExplicitValue = createFlowSnapshot(); // Create child group and update to the first version of the flow, with parameter ref final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id"); @@ -290,7 +293,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { initialProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "pass"); initialProcessor.setProperties(initialProperties); - final VersionedExternalFlow initialVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null); + final VersionedExternalFlow initialVersionSnapshot = createFlowSnapshot(); // Update processor to have a different explicit value for both sensitive and non-sensitive properties and create a versioned flow for it. final Map<String, String> updatedProperties = new HashMap<>(); @@ -298,7 +301,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { updatedProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "pass"); initialProcessor.setProperties(updatedProperties); - final VersionedExternalFlow updatedVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null); + final VersionedExternalFlow updatedVersionSnapshot = createFlowSnapshot(); // Create child group and update to the first version of the flow, with parameter ref final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id"); @@ -335,7 +338,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { initialProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}"); initialProcessor.setProperties(initialProperties); - final VersionedExternalFlow initialVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null); + final VersionedExternalFlow initialVersionSnapshot = createFlowSnapshot(); // Update processor to have a different explicit value for both sensitive and non-sensitive properties and create a versioned flow for it. final Map<String, String> updatedProperties = new HashMap<>(); @@ -343,7 +346,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { updatedProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "#{other-param}"); initialProcessor.setProperties(updatedProperties); - final VersionedExternalFlow updatedVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null); + final VersionedExternalFlow updatedVersionSnapshot = createFlowSnapshot(); // Create child group and update to the first version of the flow, with parameter ref final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id"); @@ -362,19 +365,16 @@ public class ImportFlowIT extends FrameworkIntegrationTest { @Test public void testChangeVersionFromExplicitValueToParameterSensitiveProperty() { // Create a processor with a sensitive property - final ProcessorNode processorWithParamRef = createProcessorNode(UsernamePasswordProcessor.class); - processorWithParamRef.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}")); - - final ProcessorNode processorWithExplicitValue = createProcessorNode(UsernamePasswordProcessor.class); - processorWithExplicitValue.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "secret-value")); - + final ProcessorNode processor = createProcessorNode(UsernamePasswordProcessor.class); + processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}")); // Create a VersionedExternalFlow that contains the processor final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null); - final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), - Collections.singletonList(processorWithParamRef), Collections.singleton(parameter)); + setParameter(parameter); + final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(); - final VersionedExternalFlow versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processorWithExplicitValue), null); + processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "secret-value")); + final VersionedExternalFlow versionedFlowExplicitValue = createFlowSnapshot(); // Create child group and update to the first version of the flow, with parameter ref final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id"); @@ -386,13 +386,13 @@ public class ImportFlowIT extends FrameworkIntegrationTest { final ProcessorNode nodeInGroupWithRef = innerGroup.getProcessors().iterator().next(); assertNotNull(nodeInGroupWithRef.getProperty(UsernamePasswordProcessor.PASSWORD)); - // Update the flow to new version that uses explicit value. innerGroup.updateFlow(versionedFlowWithParameterReference, (String) null, true, true, true); // Updated flow has sensitive property that no longer references parameter. Now is an explicit value, so it should be unset final ProcessorNode nodeInGroupWithNoValue = innerGroup.getProcessors().iterator().next(); - assertEquals("#{secret-param}", nodeInGroupWithNoValue.getProperty(UsernamePasswordProcessor.PASSWORD).getRawValue()); + String actual = nodeInGroupWithNoValue.getProperty(UsernamePasswordProcessor.PASSWORD).getRawValue(); + assertEquals("#{secret-param}", actual); } @Test @@ -431,18 +431,18 @@ public class ImportFlowIT extends FrameworkIntegrationTest { assertTrue(groupA.getProcessors().isEmpty()); assertTrue(groupA.getConnections().isEmpty()); assertEquals(1, groupA.getInputPorts().size()); - assertEquals(port.getVersionedComponentId(), groupA.getInputPorts().stream().findFirst().get().getVersionedComponentId()); + assertEquals(port.getName(), groupA.getInputPorts().stream().findFirst().get().getName()); //Change Process Group A version to Version 2 groupA.updateFlow(version2, null, false, true, true); //Process Group A should have a Process Group, a Processor and a Connection and no Input Ports assertEquals(1, groupA.getProcessGroups().size()); - assertEquals(groupB.getVersionedComponentId(), groupA.getProcessGroups().stream().findFirst().get().getVersionedComponentId()); + assertEquals(groupB.getName(), groupA.getProcessGroups().stream().findFirst().get().getName()); assertEquals(1, groupA.getProcessors().size()); - assertEquals(processor.getVersionedComponentId(), groupA.getProcessors().stream().findFirst().get().getVersionedComponentId()); + assertEquals(processor.getName(), groupA.getProcessors().stream().findFirst().get().getName()); assertEquals(1, groupA.getConnections().size()); - assertEquals(connection.getVersionedComponentId(), groupA.getConnections().stream().findFirst().get().getVersionedComponentId()); + assertEquals(connection.getName(), groupA.getConnections().stream().findFirst().get().getName()); assertTrue(groupA.getInputPorts().isEmpty()); } @@ -487,9 +487,9 @@ public class ImportFlowIT extends FrameworkIntegrationTest { assertEquals(1, group.getConnections().size()); assertEquals(connection.getVersionedComponentId(), group.getConnections().stream().findFirst().get().getVersionedComponentId()); assertEquals(1, group.getOutputPorts().size()); - assertEquals(port.getVersionedComponentId(), group.getOutputPorts().stream().findFirst().get().getVersionedComponentId()); + assertEquals(port.getName(), group.getOutputPorts().stream().findFirst().get().getName()); assertTrue(group.getFunnels().isEmpty()); - assertEquals(connection.getDestination().getVersionedComponentId(), port.getVersionedComponentId()); + assertEquals(connection.getDestination().getName(), port.getName()); //Change Process Group version to Version 2 group.updateFlow(version2, null, false, true, true); @@ -497,12 +497,12 @@ public class ImportFlowIT extends FrameworkIntegrationTest { //Process Group should have a Funnel, a Processor, a Connection and no Output Ports assertTrue(group.getOutputPorts().isEmpty()); assertEquals(1, group.getProcessors().size()); - assertEquals(processor.getVersionedComponentId(), group.getProcessors().stream().findFirst().get().getVersionedComponentId()); + assertEquals(processor.getName(), group.getProcessors().stream().findFirst().get().getName()); assertEquals(1, group.getConnections().size()); - assertEquals(connection.getVersionedComponentId(), group.getConnections().stream().findFirst().get().getVersionedComponentId()); + assertEquals(connection.getName(), group.getConnections().stream().findFirst().get().getName()); assertEquals(1, group.getFunnels().size()); - assertEquals(funnel.getVersionedComponentId(), group.getFunnels().stream().findFirst().get().getVersionedComponentId()); - assertEquals(connection.getDestination().getVersionedComponentId(), funnel.getVersionedComponentId()); + assertEquals(funnel.getName(), group.getFunnels().stream().findFirst().get().getName()); + assertEquals(connection.getDestination().getName(), funnel.getName()); } @Test @@ -511,45 +511,45 @@ public class ImportFlowIT extends FrameworkIntegrationTest { final ProcessGroup groupA = createProcessGroup("group-a-id", "Group A", getRootGroup()); //Create Process Group B under Process Group A - final ProcessGroup groupB = createProcessGroup("group-b-id", "Group B", groupA); + final ProcessGroup groupBunderA = createProcessGroup("group-b-id", "Group B", groupA); //Add Input port under Process Group B - final Port inputPort = getFlowController().getFlowManager().createLocalInputPort("input-port-id", "Input Port"); - groupB.addInputPort(inputPort); + final Port inputPortBThenStayThenDelete = getFlowController().getFlowManager().createLocalInputPort("input-port-id", "Input Port"); + groupBunderA.addInputPort(inputPortBThenStayThenDelete); //Add Processor 1 under Process Group A - final ProcessorNode processor1 = createProcessorNode(GenerateProcessor.class, groupA); + final ProcessorNode processorA1 = createProcessorNode(GenerateProcessor.class, groupA); //Add Processor 2 under Process Group A - final ProcessorNode processor2 = createProcessorNode(GenerateProcessor.class, groupA); + final ProcessorNode processorA2 = createProcessorNode(GenerateProcessor.class, groupA); //Add Output Port under Process Group A - final Port outputPort = getFlowController().getFlowManager().createLocalOutputPort("output-port-id", "Output Port"); - groupA.addOutputPort(outputPort); + final Port outputPortAThenB = getFlowController().getFlowManager().createLocalOutputPort("output-port-id", "Output Port"); + groupA.addOutputPort(outputPortAThenB); //Connect Processor 1 and Output Port as Connection 1 - final Connection connection1 = connect(groupA, processor1, outputPort, processor1.getRelationships()); + final Connection connectionProcessorA1ToOutputPortAThenProcessorA2 = connect(groupA, processorA1, outputPortAThenB, processorA1.getRelationships()); //Connect Processor 1 and Input Port as Connection 2 - final Connection connection2 = connect(groupA, processor1, inputPort, processor1.getRelationships()); + final Connection connectionProcessorA1ToInputPortBThenStayThenDelete = connect(groupA, processorA1, inputPortBThenStayThenDelete, processorA1.getRelationships()); //Create a snapshot final VersionedExternalFlow version1 = createFlowSnapshot(groupA); //Modify Connection 1 to point to Processor 2 - connection1.setDestination(processor2); + connectionProcessorA1ToOutputPortAThenProcessorA2.setDestination(processorA2); //Move Output Port to Process Group B - moveOutputPort(outputPort, groupB); + moveOutputPort(outputPortAThenB, groupBunderA); //Create another snapshot final VersionedExternalFlow version2 = createFlowSnapshot(groupA); //Delete connection 2 - groupA.removeConnection(connection2); + groupA.removeConnection(connectionProcessorA1ToInputPortBThenStayThenDelete); //Delete Input Port - groupB.removeInputPort(inputPort); + groupBunderA.removeInputPort(inputPortBThenStayThenDelete); //Create another snapshot final VersionedExternalFlow version3 = createFlowSnapshot(groupA); @@ -560,27 +560,27 @@ public class ImportFlowIT extends FrameworkIntegrationTest { //Process Group A should have two Processors, 2 Connections, one Output Port and one Process Group with one Input Port assertEquals(2, groupA.getProcessors().size()); assertEquals(2, groupA.getConnections().size()); - assertEquals(connection1.getDestination().getVersionedComponentId(), outputPort.getVersionedComponentId()); + assertEquals(connectionProcessorA1ToOutputPortAThenProcessorA2.getDestination().getName(), outputPortAThenB.getName()); assertEquals(1, groupA.getOutputPorts().size()); assertEquals(1, groupA.getProcessGroups().size()); - assertEquals(1, groupB.getInputPorts().size()); + assertEquals(1, groupBunderA.getInputPorts().size()); //Change Process Group version to Version 2 groupA.updateFlow(version2, null, false, true, true); //Connection1 destination changed to Processor2 and Output Port moved to Process Group B assertTrue(groupA.getOutputPorts().isEmpty()); - assertEquals(connection1.getDestination().getVersionedComponentId(), processor2.getVersionedComponentId()); - assertEquals(1, groupB.getOutputPorts().size()); - assertEquals(outputPort.getVersionedComponentId(), groupB.getOutputPorts().stream().findFirst().get().getVersionedComponentId()); + assertEquals(connectionProcessorA1ToOutputPortAThenProcessorA2.getDestination().getName(), processorA2.getName()); + assertEquals(1, groupBunderA.getOutputPorts().size()); + assertEquals(outputPortAThenB.getName(), groupBunderA.getOutputPorts().stream().findFirst().get().getName()); //Change Process Group version to Version 3 groupA.updateFlow(version3, null, false, true, true); //Connection2 and Input Port should be deleted assertEquals(1, groupA.getConnections().size()); - assertEquals(connection1.getVersionedComponentId(), groupA.getConnections().stream().findFirst().get().getVersionedComponentId()); - assertTrue(groupB.getInputPorts().isEmpty()); + assertEquals(connectionProcessorA1ToOutputPortAThenProcessorA2.getName(), groupA.getConnections().stream().findFirst().get().getName()); + assertTrue(groupBunderA.getInputPorts().isEmpty()); } @Test @@ -638,7 +638,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { //Process Group A should have two Processors, 2 Connections, one Output Port and one Process Group with one Input Port assertEquals(2, groupA.getProcessors().size()); assertEquals(2, groupA.getConnections().size()); - assertEquals(connection1.getDestination().getVersionedComponentId(), outputPort.getVersionedComponentId()); + assertEquals(connection1.getDestination().getName(), outputPort.getName()); assertEquals(1, groupA.getOutputPorts().size()); assertEquals(1, groupA.getProcessGroups().size()); assertEquals(1, groupB.getInputPorts().size()); @@ -647,7 +647,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { groupA.updateFlow(version2, null, false, true, true); //Connection1 destination changed to Processor2 and Output Port deleted - assertEquals(connection1.getDestination().getVersionedComponentId(), processor2.getVersionedComponentId()); + assertEquals(connection1.getDestination().getName(), processor2.getName()); assertTrue(groupA.getOutputPorts().isEmpty()); assertTrue(groupB.getOutputPorts().isEmpty()); @@ -656,10 +656,22 @@ public class ImportFlowIT extends FrameworkIntegrationTest { //Connection2 should be deleted and Input Port moved to Process Group A assertEquals(1, groupA.getConnections().size()); - assertEquals(connection1.getVersionedComponentId(), groupA.getConnections().stream().findFirst().get().getVersionedComponentId()); + assertEquals(connection1.getName(), groupA.getConnections().stream().findFirst().get().getName()); assertTrue(groupB.getInputPorts().isEmpty()); assertEquals(1, groupA.getInputPorts().size()); - assertEquals(inputPort.getVersionedComponentId(), groupA.getInputPorts().stream().findFirst().get().getVersionedComponentId()); + assertEquals(inputPort.getName(), groupA.getInputPorts().stream().findFirst().get().getName()); + } + + private void setParameter(Parameter parameter) { + ParameterContext rootParameterContext = getFlowController().getFlowManager().getParameterContextManager().getParameterContext("unimportant"); + if (rootParameterContext == null) { + rootParameterContext = getFlowController().getFlowManager().createParameterContext("unimportant", "unimportant", Collections.emptyMap(), Collections.emptyList()); + getRootGroup().setParameterContext(rootParameterContext); + } + + Map<String, Parameter> parameterMap = new HashMap<>(); + parameterMap.put(parameter.getDescriptor().getName(), parameter); + rootParameterContext.setParameters(parameterMap); } private ProcessGroup createProcessGroup(final String groupId, final String groupName, final ProcessGroup destination) { @@ -707,137 +719,34 @@ public class ImportFlowIT extends FrameworkIntegrationTest { return differences; } - private VersionedExternalFlow createFlowSnapshot(final ProcessGroup group, final List<ControllerServiceNode> controllerServices, - final List<ProcessorNode> processors, final Set<Parameter> parameters) { + private VersionedExternalFlow createFlowSnapshot() { + return createFlowSnapshot(getRootGroup()); + } + + private VersionedExternalFlow createFlowSnapshot(final ProcessGroup group) { createBundle(); final NiFiRegistryFlowMapper flowMapper = new NiFiRegistryFlowMapper(getExtensionManager()); - final List<ProcessorNode> processorNodes; - final List<ControllerServiceNode> controllerServiceNodes; - final List<Port> inputPorts; - final List<Port> outputPorts; - final List<Funnel> funnels; - final List<Connection> connections; - final List<ProcessGroup> processGroups; - final Set<VersionedProcessGroup> versionedProcessGroups; - - if (group == null) { - processorNodes = processors; - controllerServiceNodes = controllerServices; - inputPorts = Collections.EMPTY_LIST; - outputPorts = Collections.EMPTY_LIST; - funnels = Collections.EMPTY_LIST; - connections = Collections.EMPTY_LIST; - versionedProcessGroups = Collections.EMPTY_SET; - } else { - processorNodes = new ArrayList<>(group.getProcessors()); - controllerServiceNodes = new ArrayList<>(group.getControllerServices(false)); - inputPorts = new ArrayList<>(group.getInputPorts()); - outputPorts = new ArrayList<>(group.getOutputPorts()); - funnels = new ArrayList<>(group.getFunnels()); - connections = new ArrayList<>(group.getConnections()); - processGroups = new ArrayList<>(group.getProcessGroups()); - - final VersionedProcessGroup versionedGroup = flowMapper.mapProcessGroup(group, getFlowController().getControllerServiceProvider(),getFlowController().getFlowRegistryClient(),true); - processGroups.forEach(processGroup-> - versionedGroup.getProcessGroups().stream().filter(versionedProcessGroup -> versionedProcessGroup.getName().equals(processGroup.getName())) - .forEach(filteredProcessGroup -> processGroup.setVersionedComponentId(filteredProcessGroup.getIdentifier()))); - versionedProcessGroups = new HashSet<>(versionedGroup.getProcessGroups()); - } - - final Set<VersionedProcessor> versionedProcessors = new HashSet<>(); - for (final ProcessorNode processor : processorNodes) { - final VersionedProcessor versionedProcessor = flowMapper.mapProcessor(processor, getFlowController().getControllerServiceProvider(), Collections.emptySet(), new HashMap<>()); - versionedProcessors.add(versionedProcessor); - processor.setVersionedComponentId(versionedProcessor.getIdentifier()); - } - - final Set<VersionedControllerService> versionedServices = new HashSet<>(); - for (final ControllerServiceNode serviceNode : controllerServiceNodes) { - final VersionedControllerService versionedService = flowMapper.mapControllerService(serviceNode, getFlowController().getControllerServiceProvider(), - Collections.emptySet(), new HashMap<>()); - versionedServices.add(versionedService); - serviceNode.setVersionedComponentId(versionedService.getIdentifier()); - } - - final Set<VersionedPort> versionedInputPorts = new HashSet<>(); - for (final Port inputPort : inputPorts) { - final VersionedPort versionedInputPort = flowMapper.mapPort(inputPort); - versionedInputPorts.add(versionedInputPort); - inputPort.setVersionedComponentId(versionedInputPort.getIdentifier()); - } - - final Set<VersionedPort> versionedOutputPorts = new HashSet<>(); - for (final Port outputPort : outputPorts) { - final VersionedPort versionedOutputPort = flowMapper.mapPort(outputPort); - versionedOutputPorts.add(versionedOutputPort); - outputPort.setVersionedComponentId(versionedOutputPort.getIdentifier()); - } - - final Set<VersionedFunnel> versionedFunnels = new HashSet<>(); - for (final Funnel funnel : funnels) { - final VersionedFunnel versionedFunnel = flowMapper.mapFunnel(funnel); - versionedFunnels.add(versionedFunnel); - funnel.setVersionedComponentId(versionedFunnel.getIdentifier()); - } + InstantiatedVersionedProcessGroup instantiatedVersionedProcessGroup = flowMapper.mapNonVersionedProcessGroup(group, getFlowController().getControllerServiceProvider()); + final VersionedExternalFlow flow = new VersionedExternalFlow(); + flow.setFlowContents(instantiatedVersionedProcessGroup); - final Set<VersionedConnection> versionedConnections = new HashSet<>(); - for (final Connection connection : connections) { - final VersionedConnection versionedConnection = flowMapper.mapConnection(connection); - versionedConnections.add(versionedConnection); - connection.setVersionedComponentId(versionedConnection.getIdentifier()); + Map<String, VersionedParameterContext> parameterContexts = new HashMap<>(); + if (getRootGroup().getParameterContext() != null) { + parameterContexts.put(getRootGroup().getParameterContext().getName(), flowMapper.mapParameterContext(getRootGroup().getParameterContext())); } - - final VersionedProcessGroup flowContents = createFlowContents(); - flowContents.setProcessors(versionedProcessors); - flowContents.setControllerServices(versionedServices); - flowContents.setProcessGroups(versionedProcessGroups); - flowContents.setInputPorts(versionedInputPorts); - flowContents.setOutputPorts(versionedOutputPorts); - flowContents.setFunnels(versionedFunnels); - flowContents.setConnections(versionedConnections); - - final VersionedExternalFlow externalFlow = new VersionedExternalFlow(); - - final VersionedExternalFlowMetadata metadata = new VersionedExternalFlowMetadata(); - externalFlow.setMetadata(metadata); - metadata.setBucketIdentifier("unit-test-bucket"); - metadata.setFlowIdentifier("unit-test-flow"); - metadata.setVersion(1); - metadata.setFlowName("unit-test-flow"); - - if (parameters != null) { - final Set<VersionedParameter> versionedParameters = new HashSet<>(); - for (final Parameter parameter : parameters) { - final VersionedParameter versionedParameter = new VersionedParameter(); - versionedParameter.setName(parameter.getDescriptor().getName()); - versionedParameter.setValue(parameter.getValue()); - versionedParameter.setSensitive(parameter.getDescriptor().isSensitive()); - - versionedParameters.add(versionedParameter); + Set<ProcessGroup> childProcessGroups = getRootGroup().getProcessGroups(); + for (ProcessGroup processGroup : childProcessGroups) { + if (processGroup.getParameterContext() != null) { + parameterContexts.put(processGroup.getParameterContext().getName(), flowMapper.mapParameterContext(processGroup.getParameterContext())); } - - final VersionedParameterContext versionedParameterContext = new VersionedParameterContext(); - versionedParameterContext.setName("Unit Test Context"); - versionedParameterContext.setParameters(versionedParameters); - externalFlow.setParameterContexts(Collections.singletonMap(versionedParameterContext.getName(), versionedParameterContext)); - - flowContents.setParameterContextName("Unit Test Context"); } + flow.setParameterContexts(parameterContexts); - return externalFlow; - } - - private VersionedExternalFlow createFlowSnapshot(final List<ControllerServiceNode> controllerServices, final List<ProcessorNode> processors, final Set<Parameter> parameters) { - return createFlowSnapshot(null, controllerServices, processors, parameters); - } - - private VersionedExternalFlow createFlowSnapshot(final ProcessGroup group) { - return createFlowSnapshot(group, Collections.emptyList(), Collections.emptyList(), null); + return flow; } - @NotNull private VersionedProcessGroup createFlowContents() { final VersionedProcessGroup flowContents = new VersionedProcessGroup();
