This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 1cf4e72084 NIFI-9875 In 
StandardProcessGroupSynchronizer.updateConnectionDestinations handle special 
case when output port is moved to a child process group while kept connected. 
(#5931)
1cf4e72084 is described below

commit 1cf4e72084f30707ffb0676f77355a810c2d6e9e
Author: tpalfy <[email protected]>
AuthorDate: Fri Apr 8 16:08:31 2022 +0200

    NIFI-9875 In StandardProcessGroupSynchronizer.updateConnectionDestinations 
handle special case when output port is moved to a child process group while 
kept connected. (#5931)
---
 .../groups/StandardProcessGroupSynchronizer.java   |  10 +-
 .../nifi/integration/versioned/ImportFlowIT.java   | 281 +++++++--------------
 2 files changed, 103 insertions(+), 188 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
index cefaf22ada..8c5592dc18 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
@@ -250,7 +250,9 @@ public class StandardProcessGroupSynchronizer implements 
ProcessGroupSynchronize
 
         // Ensure that we create all Parameter Contexts before updating them. 
This is necessary in case the proposed incoming dataflow has
         // parameter contexts that inherit from one another and neither the 
inheriting nor inherited parameter context exists.
-        
versionedParameterContexts.values().forEach(this::createParameterContextWithoutReferences);
+        if (versionedParameterContexts != null) {
+            
versionedParameterContexts.values().forEach(this::createParameterContextWithoutReferences);
+        }
 
         updateParameterContext(group, proposed, versionedParameterContexts, 
context.getComponentIdGenerator());
         updateVariableRegistry(group, proposed);
@@ -559,7 +561,11 @@ public class StandardProcessGroupSynchronizer implements 
ProcessGroupSynchronize
             // Find the destination of the connection. If the destination 
doesn't yet exist (because it's part of the proposed Process Group but not yet 
added),
             // we will set the destination to a temporary destination. Then, 
after adding components, we will update the destinations again.
             Connectable newDestination = getConnectable(group, 
proposedConnection.getDestination());
-            if (newDestination == null) {
+            if (
+                newDestination == null
+                ||
+                (newDestination.getConnectableType() == 
ConnectableType.OUTPUT_PORT && 
!newDestination.getProcessGroup().equals(connection.getProcessGroup()))
+            ) {
                 final Funnel temporaryDestination = 
getTemporaryFunnel(connection.getProcessGroup());
                 LOG.debug("Updated Connection {} to have a temporary 
destination of {}", connection, temporaryDestination);
                 newDestination = temporaryDestination;
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
index 92f711e87a..2c82165e67 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
@@ -27,16 +27,9 @@ import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.StandardSnippet;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.flow.Bundle;
-import org.apache.nifi.flow.VersionedConnection;
-import org.apache.nifi.flow.VersionedControllerService;
 import org.apache.nifi.flow.VersionedExternalFlow;
-import org.apache.nifi.flow.VersionedExternalFlowMetadata;
-import org.apache.nifi.flow.VersionedFunnel;
-import org.apache.nifi.flow.VersionedParameter;
 import org.apache.nifi.flow.VersionedParameterContext;
-import org.apache.nifi.flow.VersionedPort;
 import org.apache.nifi.flow.VersionedProcessGroup;
-import org.apache.nifi.flow.VersionedProcessor;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.integration.DirectInjectionExtensionManager;
 import org.apache.nifi.integration.FrameworkIntegrationTest;
@@ -61,12 +54,13 @@ import org.apache.nifi.registry.flow.diff.FlowComparison;
 import org.apache.nifi.registry.flow.diff.FlowDifference;
 import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
 import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
 import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
 import org.apache.nifi.util.FlowDifferenceFilters;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Test;
 
-import java.util.ArrayList;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -85,6 +79,14 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.AssertJUnit.assertNull;
 
 public class ImportFlowIT extends FrameworkIntegrationTest {
+    @Override
+    public void setup() throws IOException {
+        super.setup();
+
+        for (ProcessorNode processor : getRootGroup().getProcessors()) {
+            getRootGroup().removeProcessor(processor);
+        }
+    }
 
     @Override
     protected void injectExtensionTypes(final DirectInjectionExtensionManager 
extensionManager) {
@@ -103,7 +105,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         
processor.setAutoTerminatedRelationships(Collections.singleton(REL_SUCCESS));
         
processor.setProperties(Collections.singletonMap(NopServiceReferencingProcessor.SERVICE.getName(),
 controllerService.getIdentifier()));
 
-        final VersionedExternalFlow proposedFlow = 
createFlowSnapshot(Collections.singletonList(controllerService), 
Collections.singletonList(processor), null);
+        final VersionedExternalFlow proposedFlow = createFlowSnapshot();
 
         // Create an Inner Process Group and update it to match the Versioned 
Flow.
         final ProcessGroup innerGroup = 
getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -147,7 +149,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         
processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(),
 "password"));
 
         // Create a VersionedExternalFlow that contains the processor
-        final VersionedExternalFlow versionedFlowWithExplicitValue = 
createFlowSnapshot(Collections.emptyList(), 
Collections.singletonList(processor), null);
+        final VersionedExternalFlow versionedFlowWithExplicitValue = 
createFlowSnapshot();
 
         // Create child group
         final ProcessGroup innerGroup = 
getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -173,7 +175,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         assertEquals(DifferenceType.PROPERTY_PARAMETERIZED, 
differences.iterator().next().getDifferenceType());
 
         // Create a Versioned Flow that contains the Parameter Reference.
-        final VersionedExternalFlow versionedFlowWithParameterReference = 
createFlowSnapshot(Collections.emptyList(), 
Collections.singletonList(processor), null);
+        final VersionedExternalFlow versionedFlowWithParameterReference = 
createFlowSnapshot(innerGroup);
 
         // Ensure no difference between the current configuration and the 
versioned flow
         differences = getLocalModifications(innerGroup, 
versionedFlowWithParameterReference);
@@ -193,7 +195,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
 
         // Create a VersionedExternalFlow that contains the processor
         final Parameter parameter = new Parameter(new 
ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), 
null);
-        final VersionedExternalFlow versionedFlowWithParameterReference = 
createFlowSnapshot(Collections.emptyList(), 
Collections.singletonList(processor), Collections.singleton(parameter));
+        setParameter(parameter);
+        final VersionedExternalFlow versionedFlowWithParameterReference = 
createFlowSnapshot();
 
         // Create child group
         final ProcessGroup innerGroup = 
getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -225,7 +228,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
 
         // Create a VersionedExternalFlow that contains the processor
         final Parameter parameter = new Parameter(new 
ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), 
null);
-        final VersionedExternalFlow versionedFlowWithParameterReference = 
createFlowSnapshot(Collections.emptyList(), 
Collections.singletonList(processor), Collections.singleton(parameter));
+        setParameter(parameter);
+        final VersionedExternalFlow versionedFlowWithParameterReference = 
createFlowSnapshot();
 
         // Create child group
         final ProcessGroup innerGroup = 
getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -255,13 +259,12 @@ public class ImportFlowIT extends 
FrameworkIntegrationTest {
 
         // Create a VersionedExternalFlow that contains the processor
         final Parameter parameter = new Parameter(new 
ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), 
null);
-        final VersionedExternalFlow versionedFlowWithParameterReference = 
createFlowSnapshot(Collections.emptyList(),
-            Collections.singletonList(initialProcessor), 
Collections.singleton(parameter));
-
+        setParameter(parameter);
+        final VersionedExternalFlow versionedFlowWithParameterReference = 
createFlowSnapshot();
 
         // Update processor to have an explicit value for the second version 
of the flow.
         
initialProcessor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(),
 "secret-value"));
-        final VersionedExternalFlow versionedFlowExplicitValue = 
createFlowSnapshot(Collections.emptyList(), 
Collections.singletonList(initialProcessor), null);
+        final VersionedExternalFlow versionedFlowExplicitValue = 
createFlowSnapshot();
 
         // Create child group and update to the first version of the flow, 
with parameter ref
         final ProcessGroup innerGroup = 
getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -290,7 +293,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         initialProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), 
"pass");
         initialProcessor.setProperties(initialProperties);
 
-        final VersionedExternalFlow initialVersionSnapshot = 
createFlowSnapshot(Collections.emptyList(), 
Collections.singletonList(initialProcessor), null);
+        final VersionedExternalFlow initialVersionSnapshot = 
createFlowSnapshot();
 
         // Update processor to have a different explicit value for both 
sensitive and non-sensitive properties and create a versioned flow for it.
         final Map<String, String> updatedProperties = new HashMap<>();
@@ -298,7 +301,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         updatedProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), 
"pass");
         initialProcessor.setProperties(updatedProperties);
 
-        final VersionedExternalFlow updatedVersionSnapshot = 
createFlowSnapshot(Collections.emptyList(), 
Collections.singletonList(initialProcessor), null);
+        final VersionedExternalFlow updatedVersionSnapshot = 
createFlowSnapshot();
 
         // Create child group and update to the first version of the flow, 
with parameter ref
         final ProcessGroup innerGroup = 
getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -335,7 +338,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         initialProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), 
"#{secret-param}");
         initialProcessor.setProperties(initialProperties);
 
-        final VersionedExternalFlow initialVersionSnapshot = 
createFlowSnapshot(Collections.emptyList(), 
Collections.singletonList(initialProcessor), null);
+        final VersionedExternalFlow initialVersionSnapshot = 
createFlowSnapshot();
 
         // Update processor to have a different explicit value for both 
sensitive and non-sensitive properties and create a versioned flow for it.
         final Map<String, String> updatedProperties = new HashMap<>();
@@ -343,7 +346,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         updatedProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), 
"#{other-param}");
         initialProcessor.setProperties(updatedProperties);
 
-        final VersionedExternalFlow updatedVersionSnapshot = 
createFlowSnapshot(Collections.emptyList(), 
Collections.singletonList(initialProcessor), null);
+        final VersionedExternalFlow updatedVersionSnapshot = 
createFlowSnapshot();
 
         // Create child group and update to the first version of the flow, 
with parameter ref
         final ProcessGroup innerGroup = 
getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -362,19 +365,16 @@ public class ImportFlowIT extends 
FrameworkIntegrationTest {
     @Test
     public void 
testChangeVersionFromExplicitValueToParameterSensitiveProperty() {
         // Create a processor with a sensitive property
-        final ProcessorNode processorWithParamRef = 
createProcessorNode(UsernamePasswordProcessor.class);
-        
processorWithParamRef.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(),
 "#{secret-param}"));
-
-        final ProcessorNode processorWithExplicitValue = 
createProcessorNode(UsernamePasswordProcessor.class);
-        
processorWithExplicitValue.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(),
 "secret-value"));
-
+        final ProcessorNode processor = 
createProcessorNode(UsernamePasswordProcessor.class);
+        
processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(),
 "#{secret-param}"));
 
         // Create a VersionedExternalFlow that contains the processor
         final Parameter parameter = new Parameter(new 
ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), 
null);
-        final VersionedExternalFlow versionedFlowWithParameterReference = 
createFlowSnapshot(Collections.emptyList(),
-            Collections.singletonList(processorWithParamRef), 
Collections.singleton(parameter));
+        setParameter(parameter);
+        final VersionedExternalFlow versionedFlowWithParameterReference = 
createFlowSnapshot();
 
-        final VersionedExternalFlow versionedFlowExplicitValue = 
createFlowSnapshot(Collections.emptyList(), 
Collections.singletonList(processorWithExplicitValue), null);
+        
processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(),
 "secret-value"));
+        final VersionedExternalFlow versionedFlowExplicitValue = 
createFlowSnapshot();
 
         // Create child group and update to the first version of the flow, 
with parameter ref
         final ProcessGroup innerGroup = 
getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -386,13 +386,13 @@ public class ImportFlowIT extends 
FrameworkIntegrationTest {
         final ProcessorNode nodeInGroupWithRef = 
innerGroup.getProcessors().iterator().next();
         
assertNotNull(nodeInGroupWithRef.getProperty(UsernamePasswordProcessor.PASSWORD));
 
-
         // Update the flow to new version that uses explicit value.
         innerGroup.updateFlow(versionedFlowWithParameterReference, (String) 
null, true, true, true);
 
         // Updated flow has sensitive property that no longer references 
parameter. Now is an explicit value, so it should be unset
         final ProcessorNode nodeInGroupWithNoValue = 
innerGroup.getProcessors().iterator().next();
-        assertEquals("#{secret-param}", 
nodeInGroupWithNoValue.getProperty(UsernamePasswordProcessor.PASSWORD).getRawValue());
+        String actual = 
nodeInGroupWithNoValue.getProperty(UsernamePasswordProcessor.PASSWORD).getRawValue();
+        assertEquals("#{secret-param}", actual);
     }
 
     @Test
@@ -431,18 +431,18 @@ public class ImportFlowIT extends 
FrameworkIntegrationTest {
         assertTrue(groupA.getProcessors().isEmpty());
         assertTrue(groupA.getConnections().isEmpty());
         assertEquals(1, groupA.getInputPorts().size());
-        assertEquals(port.getVersionedComponentId(), 
groupA.getInputPorts().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(port.getName(), 
groupA.getInputPorts().stream().findFirst().get().getName());
 
         //Change Process Group A version to Version 2
         groupA.updateFlow(version2, null, false, true, true);
 
         //Process Group A should have a Process Group, a Processor and a 
Connection and no Input Ports
         assertEquals(1, groupA.getProcessGroups().size());
-        assertEquals(groupB.getVersionedComponentId(), 
groupA.getProcessGroups().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(groupB.getName(), 
groupA.getProcessGroups().stream().findFirst().get().getName());
         assertEquals(1, groupA.getProcessors().size());
-        assertEquals(processor.getVersionedComponentId(), 
groupA.getProcessors().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(processor.getName(), 
groupA.getProcessors().stream().findFirst().get().getName());
         assertEquals(1, groupA.getConnections().size());
-        assertEquals(connection.getVersionedComponentId(), 
groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(connection.getName(), 
groupA.getConnections().stream().findFirst().get().getName());
         assertTrue(groupA.getInputPorts().isEmpty());
     }
 
@@ -487,9 +487,9 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         assertEquals(1, group.getConnections().size());
         assertEquals(connection.getVersionedComponentId(), 
group.getConnections().stream().findFirst().get().getVersionedComponentId());
         assertEquals(1, group.getOutputPorts().size());
-        assertEquals(port.getVersionedComponentId(), 
group.getOutputPorts().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(port.getName(), 
group.getOutputPorts().stream().findFirst().get().getName());
         assertTrue(group.getFunnels().isEmpty());
-        assertEquals(connection.getDestination().getVersionedComponentId(), 
port.getVersionedComponentId());
+        assertEquals(connection.getDestination().getName(), port.getName());
 
         //Change Process Group version to Version 2
         group.updateFlow(version2, null, false, true, true);
@@ -497,12 +497,12 @@ public class ImportFlowIT extends 
FrameworkIntegrationTest {
         //Process Group should have a Funnel, a Processor, a Connection and no 
Output Ports
         assertTrue(group.getOutputPorts().isEmpty());
         assertEquals(1, group.getProcessors().size());
-        assertEquals(processor.getVersionedComponentId(), 
group.getProcessors().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(processor.getName(), 
group.getProcessors().stream().findFirst().get().getName());
         assertEquals(1, group.getConnections().size());
-        assertEquals(connection.getVersionedComponentId(), 
group.getConnections().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(connection.getName(), 
group.getConnections().stream().findFirst().get().getName());
         assertEquals(1, group.getFunnels().size());
-        assertEquals(funnel.getVersionedComponentId(), 
group.getFunnels().stream().findFirst().get().getVersionedComponentId());
-        assertEquals(connection.getDestination().getVersionedComponentId(), 
funnel.getVersionedComponentId());
+        assertEquals(funnel.getName(), 
group.getFunnels().stream().findFirst().get().getName());
+        assertEquals(connection.getDestination().getName(), funnel.getName());
     }
 
     @Test
@@ -511,45 +511,45 @@ public class ImportFlowIT extends 
FrameworkIntegrationTest {
         final ProcessGroup groupA = createProcessGroup("group-a-id", "Group 
A", getRootGroup());
 
         //Create Process Group B under Process Group A
-        final ProcessGroup groupB = createProcessGroup("group-b-id", "Group 
B", groupA);
+        final ProcessGroup groupBunderA = createProcessGroup("group-b-id", 
"Group B", groupA);
 
         //Add Input port under Process Group B
-        final Port inputPort = 
getFlowController().getFlowManager().createLocalInputPort("input-port-id", 
"Input Port");
-        groupB.addInputPort(inputPort);
+        final Port inputPortBThenStayThenDelete = 
getFlowController().getFlowManager().createLocalInputPort("input-port-id", 
"Input Port");
+        groupBunderA.addInputPort(inputPortBThenStayThenDelete);
 
         //Add Processor 1 under Process Group A
-        final ProcessorNode processor1 = 
createProcessorNode(GenerateProcessor.class, groupA);
+        final ProcessorNode processorA1 = 
createProcessorNode(GenerateProcessor.class, groupA);
 
         //Add Processor 2 under Process Group A
-        final ProcessorNode processor2 = 
createProcessorNode(GenerateProcessor.class, groupA);
+        final ProcessorNode processorA2 = 
createProcessorNode(GenerateProcessor.class, groupA);
 
         //Add Output Port under Process Group A
-        final Port outputPort = 
getFlowController().getFlowManager().createLocalOutputPort("output-port-id", 
"Output Port");
-        groupA.addOutputPort(outputPort);
+        final Port outputPortAThenB = 
getFlowController().getFlowManager().createLocalOutputPort("output-port-id", 
"Output Port");
+        groupA.addOutputPort(outputPortAThenB);
 
         //Connect Processor 1 and Output Port as Connection 1
-        final Connection connection1 = connect(groupA, processor1, outputPort, 
processor1.getRelationships());
+        final Connection connectionProcessorA1ToOutputPortAThenProcessorA2 = 
connect(groupA, processorA1, outputPortAThenB, processorA1.getRelationships());
 
         //Connect Processor 1 and Input Port as Connection 2
-        final Connection connection2 = connect(groupA, processor1, inputPort, 
processor1.getRelationships());
+        final Connection connectionProcessorA1ToInputPortBThenStayThenDelete = 
connect(groupA, processorA1, inputPortBThenStayThenDelete, 
processorA1.getRelationships());
 
         //Create a snapshot
         final VersionedExternalFlow version1 = createFlowSnapshot(groupA);
 
         //Modify Connection 1 to point to Processor 2
-        connection1.setDestination(processor2);
+        
connectionProcessorA1ToOutputPortAThenProcessorA2.setDestination(processorA2);
 
         //Move Output Port to Process Group B
-        moveOutputPort(outputPort, groupB);
+        moveOutputPort(outputPortAThenB, groupBunderA);
 
         //Create another snapshot
         final VersionedExternalFlow version2 = createFlowSnapshot(groupA);
 
         //Delete connection 2
-        groupA.removeConnection(connection2);
+        
groupA.removeConnection(connectionProcessorA1ToInputPortBThenStayThenDelete);
 
         //Delete Input Port
-        groupB.removeInputPort(inputPort);
+        groupBunderA.removeInputPort(inputPortBThenStayThenDelete);
 
         //Create another snapshot
         final VersionedExternalFlow version3 = createFlowSnapshot(groupA);
@@ -560,27 +560,27 @@ public class ImportFlowIT extends 
FrameworkIntegrationTest {
         //Process Group A should have two Processors, 2 Connections, one 
Output Port and one Process Group with one Input Port
         assertEquals(2, groupA.getProcessors().size());
         assertEquals(2, groupA.getConnections().size());
-        assertEquals(connection1.getDestination().getVersionedComponentId(), 
outputPort.getVersionedComponentId());
+        
assertEquals(connectionProcessorA1ToOutputPortAThenProcessorA2.getDestination().getName(),
 outputPortAThenB.getName());
         assertEquals(1, groupA.getOutputPorts().size());
         assertEquals(1, groupA.getProcessGroups().size());
-        assertEquals(1, groupB.getInputPorts().size());
+        assertEquals(1, groupBunderA.getInputPorts().size());
 
         //Change Process Group version to Version 2
         groupA.updateFlow(version2, null, false, true, true);
 
         //Connection1 destination changed to Processor2 and Output Port moved 
to Process Group B
         assertTrue(groupA.getOutputPorts().isEmpty());
-        assertEquals(connection1.getDestination().getVersionedComponentId(), 
processor2.getVersionedComponentId());
-        assertEquals(1, groupB.getOutputPorts().size());
-        assertEquals(outputPort.getVersionedComponentId(), 
groupB.getOutputPorts().stream().findFirst().get().getVersionedComponentId());
+        
assertEquals(connectionProcessorA1ToOutputPortAThenProcessorA2.getDestination().getName(),
 processorA2.getName());
+        assertEquals(1, groupBunderA.getOutputPorts().size());
+        assertEquals(outputPortAThenB.getName(), 
groupBunderA.getOutputPorts().stream().findFirst().get().getName());
 
         //Change Process Group version to Version 3
         groupA.updateFlow(version3, null, false, true, true);
 
         //Connection2 and Input Port should be deleted
         assertEquals(1, groupA.getConnections().size());
-        assertEquals(connection1.getVersionedComponentId(), 
groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
-        assertTrue(groupB.getInputPorts().isEmpty());
+        
assertEquals(connectionProcessorA1ToOutputPortAThenProcessorA2.getName(), 
groupA.getConnections().stream().findFirst().get().getName());
+        assertTrue(groupBunderA.getInputPorts().isEmpty());
     }
 
     @Test
@@ -638,7 +638,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         //Process Group A should have two Processors, 2 Connections, one 
Output Port and one Process Group with one Input Port
         assertEquals(2, groupA.getProcessors().size());
         assertEquals(2, groupA.getConnections().size());
-        assertEquals(connection1.getDestination().getVersionedComponentId(), 
outputPort.getVersionedComponentId());
+        assertEquals(connection1.getDestination().getName(), 
outputPort.getName());
         assertEquals(1, groupA.getOutputPorts().size());
         assertEquals(1, groupA.getProcessGroups().size());
         assertEquals(1, groupB.getInputPorts().size());
@@ -647,7 +647,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         groupA.updateFlow(version2, null, false, true, true);
 
         //Connection1 destination changed to Processor2 and Output Port deleted
-        assertEquals(connection1.getDestination().getVersionedComponentId(), 
processor2.getVersionedComponentId());
+        assertEquals(connection1.getDestination().getName(), 
processor2.getName());
         assertTrue(groupA.getOutputPorts().isEmpty());
         assertTrue(groupB.getOutputPorts().isEmpty());
 
@@ -656,10 +656,22 @@ public class ImportFlowIT extends 
FrameworkIntegrationTest {
 
         //Connection2 should be deleted and Input Port moved to Process Group A
         assertEquals(1, groupA.getConnections().size());
-        assertEquals(connection1.getVersionedComponentId(), 
groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(connection1.getName(), 
groupA.getConnections().stream().findFirst().get().getName());
         assertTrue(groupB.getInputPorts().isEmpty());
         assertEquals(1, groupA.getInputPorts().size());
-        assertEquals(inputPort.getVersionedComponentId(), 
groupA.getInputPorts().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(inputPort.getName(), 
groupA.getInputPorts().stream().findFirst().get().getName());
+    }
+
+    private void setParameter(Parameter parameter) {
+        ParameterContext rootParameterContext = 
getFlowController().getFlowManager().getParameterContextManager().getParameterContext("unimportant");
+        if (rootParameterContext == null) {
+            rootParameterContext = 
getFlowController().getFlowManager().createParameterContext("unimportant", 
"unimportant", Collections.emptyMap(), Collections.emptyList());
+            getRootGroup().setParameterContext(rootParameterContext);
+        }
+
+        Map<String, Parameter> parameterMap = new HashMap<>();
+        parameterMap.put(parameter.getDescriptor().getName(), parameter);
+        rootParameterContext.setParameters(parameterMap);
     }
 
     private ProcessGroup createProcessGroup(final String groupId, final String 
groupName, final ProcessGroup destination) {
@@ -707,137 +719,34 @@ public class ImportFlowIT extends 
FrameworkIntegrationTest {
         return differences;
     }
 
-    private VersionedExternalFlow createFlowSnapshot(final ProcessGroup group, 
final List<ControllerServiceNode> controllerServices,
-                                                     final List<ProcessorNode> 
processors, final Set<Parameter> parameters) {
+    private VersionedExternalFlow createFlowSnapshot() {
+        return createFlowSnapshot(getRootGroup());
+    }
+
+    private VersionedExternalFlow createFlowSnapshot(final ProcessGroup group) 
{
         createBundle();
 
         final NiFiRegistryFlowMapper flowMapper = new 
NiFiRegistryFlowMapper(getExtensionManager());
 
-        final List<ProcessorNode> processorNodes;
-        final List<ControllerServiceNode> controllerServiceNodes;
-        final List<Port> inputPorts;
-        final List<Port> outputPorts;
-        final List<Funnel> funnels;
-        final List<Connection> connections;
-        final List<ProcessGroup> processGroups;
-        final Set<VersionedProcessGroup> versionedProcessGroups;
-
-        if (group == null) {
-            processorNodes = processors;
-            controllerServiceNodes = controllerServices;
-            inputPorts = Collections.EMPTY_LIST;
-            outputPorts = Collections.EMPTY_LIST;
-            funnels = Collections.EMPTY_LIST;
-            connections = Collections.EMPTY_LIST;
-            versionedProcessGroups = Collections.EMPTY_SET;
-        } else {
-            processorNodes = new ArrayList<>(group.getProcessors());
-            controllerServiceNodes = new 
ArrayList<>(group.getControllerServices(false));
-            inputPorts = new ArrayList<>(group.getInputPorts());
-            outputPorts = new ArrayList<>(group.getOutputPorts());
-            funnels = new ArrayList<>(group.getFunnels());
-            connections = new ArrayList<>(group.getConnections());
-            processGroups = new ArrayList<>(group.getProcessGroups());
-
-            final VersionedProcessGroup versionedGroup = 
flowMapper.mapProcessGroup(group, 
getFlowController().getControllerServiceProvider(),getFlowController().getFlowRegistryClient(),true);
-            processGroups.forEach(processGroup->
-                
versionedGroup.getProcessGroups().stream().filter(versionedProcessGroup -> 
versionedProcessGroup.getName().equals(processGroup.getName()))
-                        .forEach(filteredProcessGroup -> 
processGroup.setVersionedComponentId(filteredProcessGroup.getIdentifier())));
-            versionedProcessGroups = new 
HashSet<>(versionedGroup.getProcessGroups());
-        }
-
-        final Set<VersionedProcessor> versionedProcessors = new HashSet<>();
-        for (final ProcessorNode processor : processorNodes) {
-            final VersionedProcessor versionedProcessor = 
flowMapper.mapProcessor(processor, 
getFlowController().getControllerServiceProvider(), Collections.emptySet(), new 
HashMap<>());
-            versionedProcessors.add(versionedProcessor);
-            
processor.setVersionedComponentId(versionedProcessor.getIdentifier());
-        }
-
-        final Set<VersionedControllerService> versionedServices = new 
HashSet<>();
-        for (final ControllerServiceNode serviceNode : controllerServiceNodes) 
{
-            final VersionedControllerService versionedService = 
flowMapper.mapControllerService(serviceNode, 
getFlowController().getControllerServiceProvider(),
-                    Collections.emptySet(), new HashMap<>());
-            versionedServices.add(versionedService);
-            
serviceNode.setVersionedComponentId(versionedService.getIdentifier());
-        }
-
-        final Set<VersionedPort> versionedInputPorts = new HashSet<>();
-        for (final Port inputPort : inputPorts) {
-            final VersionedPort versionedInputPort = 
flowMapper.mapPort(inputPort);
-            versionedInputPorts.add(versionedInputPort);
-            
inputPort.setVersionedComponentId(versionedInputPort.getIdentifier());
-        }
-
-        final Set<VersionedPort> versionedOutputPorts = new HashSet<>();
-        for (final Port outputPort : outputPorts) {
-            final VersionedPort versionedOutputPort = 
flowMapper.mapPort(outputPort);
-            versionedOutputPorts.add(versionedOutputPort);
-            
outputPort.setVersionedComponentId(versionedOutputPort.getIdentifier());
-        }
-
-        final Set<VersionedFunnel> versionedFunnels = new HashSet<>();
-        for (final Funnel funnel : funnels) {
-            final VersionedFunnel versionedFunnel = 
flowMapper.mapFunnel(funnel);
-            versionedFunnels.add(versionedFunnel);
-            funnel.setVersionedComponentId(versionedFunnel.getIdentifier());
-        }
+        InstantiatedVersionedProcessGroup instantiatedVersionedProcessGroup = 
flowMapper.mapNonVersionedProcessGroup(group, 
getFlowController().getControllerServiceProvider());
+        final VersionedExternalFlow flow = new VersionedExternalFlow();
+        flow.setFlowContents(instantiatedVersionedProcessGroup);
 
-        final Set<VersionedConnection> versionedConnections = new HashSet<>();
-        for (final Connection connection : connections) {
-            final VersionedConnection versionedConnection = 
flowMapper.mapConnection(connection);
-            versionedConnections.add(versionedConnection);
-            
connection.setVersionedComponentId(versionedConnection.getIdentifier());
+        Map<String, VersionedParameterContext> parameterContexts = new 
HashMap<>();
+        if (getRootGroup().getParameterContext() != null) {
+            
parameterContexts.put(getRootGroup().getParameterContext().getName(), 
flowMapper.mapParameterContext(getRootGroup().getParameterContext()));
         }
-
-        final VersionedProcessGroup flowContents = createFlowContents();
-        flowContents.setProcessors(versionedProcessors);
-        flowContents.setControllerServices(versionedServices);
-        flowContents.setProcessGroups(versionedProcessGroups);
-        flowContents.setInputPorts(versionedInputPorts);
-        flowContents.setOutputPorts(versionedOutputPorts);
-        flowContents.setFunnels(versionedFunnels);
-        flowContents.setConnections(versionedConnections);
-
-        final VersionedExternalFlow externalFlow = new VersionedExternalFlow();
-
-        final VersionedExternalFlowMetadata metadata = new 
VersionedExternalFlowMetadata();
-        externalFlow.setMetadata(metadata);
-        metadata.setBucketIdentifier("unit-test-bucket");
-        metadata.setFlowIdentifier("unit-test-flow");
-        metadata.setVersion(1);
-        metadata.setFlowName("unit-test-flow");
-
-        if (parameters != null) {
-            final Set<VersionedParameter> versionedParameters = new 
HashSet<>();
-            for (final Parameter parameter : parameters) {
-                final VersionedParameter versionedParameter = new 
VersionedParameter();
-                
versionedParameter.setName(parameter.getDescriptor().getName());
-                versionedParameter.setValue(parameter.getValue());
-                
versionedParameter.setSensitive(parameter.getDescriptor().isSensitive());
-
-                versionedParameters.add(versionedParameter);
+        Set<ProcessGroup> childProcessGroups = 
getRootGroup().getProcessGroups();
+        for (ProcessGroup processGroup : childProcessGroups) {
+            if (processGroup.getParameterContext() != null) {
+                
parameterContexts.put(processGroup.getParameterContext().getName(), 
flowMapper.mapParameterContext(processGroup.getParameterContext()));
             }
-
-            final VersionedParameterContext versionedParameterContext = new 
VersionedParameterContext();
-            versionedParameterContext.setName("Unit Test Context");
-            versionedParameterContext.setParameters(versionedParameters);
-            
externalFlow.setParameterContexts(Collections.singletonMap(versionedParameterContext.getName(),
 versionedParameterContext));
-
-            flowContents.setParameterContextName("Unit Test Context");
         }
+        flow.setParameterContexts(parameterContexts);
 
-        return externalFlow;
-    }
-
-    private VersionedExternalFlow createFlowSnapshot(final 
List<ControllerServiceNode> controllerServices, final List<ProcessorNode> 
processors, final Set<Parameter> parameters) {
-        return createFlowSnapshot(null, controllerServices, processors, 
parameters);
-    }
-
-    private VersionedExternalFlow createFlowSnapshot(final ProcessGroup group) 
{
-        return createFlowSnapshot(group, Collections.emptyList(), 
Collections.emptyList(), null);
+        return flow;
     }
 
-
     @NotNull
     private VersionedProcessGroup createFlowContents() {
         final VersionedProcessGroup flowContents = new VersionedProcessGroup();

Reply via email to