This is an automated email from the ASF dual-hosted git repository.

denes pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 72660af  NIFI-9229 Flow upgrade not possible if a Output Port changes 
to a funnel (#5402)
72660af is described below

commit 72660af479d726061eb76854398cb88ae4f25c30
Author: timeabarna <[email protected]>
AuthorDate: Mon Oct 4 09:14:36 2021 +0200

    NIFI-9229 Flow upgrade not possible if a Output Port changes to a funnel 
(#5402)
    
    * NIFI-9229 Flow upgrade not possible if a Output Port changes to a funnel
    * NIFI-9229 Addressing review comments modified log message and added 
comments
---
 .../apache/nifi/groups/StandardProcessGroup.java   |  37 ++-
 .../nifi/integration/versioned/ImportFlowIT.java   | 248 +++++++++++++++++++++
 2 files changed, 281 insertions(+), 4 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index f933cf7..9c8d264 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -158,6 +158,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -4293,16 +4294,30 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         //As Input Port (IP1) originally belonged to PGA the new connection 
would be incorrectly linked to the old Input Port
         //instead of the one being in PGB, so it needs to be removed first 
before updating the connections.
 
-        for (final String removedVersionedId : inputPortsRemoved) {
+        Iterator<String> inputPortsRemovedIterator = 
inputPortsRemoved.iterator();
+        while (inputPortsRemovedIterator.hasNext()) {
+            final String removedVersionedId = inputPortsRemovedIterator.next();
             final Port port = inputPortsByVersionedId.get(removedVersionedId);
             LOG.info("Removing {} from {}", port, group);
-            group.removeInputPort(port);
+            try {
+                group.removeInputPort(port);
+                inputPortsRemovedIterator.remove();
+            } catch (IllegalStateException e) {
+                LOG.info("Removing {} from {} not possible at the moment, will 
try again after updated the connections.", port, group);
+            }
         }
 
-        for (final String removedVersionedId : outputPortsRemoved) {
+        Iterator<String> outputPortsRemovedIterator = 
outputPortsRemoved.iterator();
+        while (outputPortsRemovedIterator.hasNext()) {
+            final String removedVersionedId = 
outputPortsRemovedIterator.next();
             final Port port = outputPortsByVersionedId.get(removedVersionedId);
             LOG.info("Removing {} from {}", port, group);
-            group.removeOutputPort(port);
+            try {
+                group.removeOutputPort(port);
+                outputPortsRemovedIterator.remove();
+            } catch (IllegalStateException e) {
+                LOG.info("Removing {} from {} not possible at the moment, will 
try again after updated the connections.", port, group);
+            }
         }
 
         // Add and update Connections
@@ -4343,6 +4358,20 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             group.removeFunnel(funnel);
         }
 
+        //Removing remaining input ports
+        for (final String removedVersionedId : inputPortsRemoved) {
+            final Port port = inputPortsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", port, group);
+            group.removeInputPort(port);
+        }
+
+        //Removing remaining output ports
+        for (final String removedVersionedId : outputPortsRemoved) {
+            final Port port = outputPortsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", port, group);
+            group.removeOutputPort(port);
+        }
+
         // Now that all input/output ports have been removed, we should be 
able to update
         // all ports to the final name that was proposed in the new flow 
version.
         for (final Map.Entry<Port, String> portAndFinalName : 
proposedPortFinalNames.entrySet()) {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
index c4a00b3..cdf7a65 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
@@ -20,12 +20,14 @@ import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.StandardSnippet;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedFunnel;
 import org.apache.nifi.flow.VersionedPort;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.integration.DirectInjectionExtensionManager;
@@ -443,6 +445,222 @@ public class ImportFlowIT extends 
FrameworkIntegrationTest {
         assertTrue(groupA.getInputPorts().isEmpty());
     }
 
+    @Test
+    public void testUpdateFlowWithOutputPortChangedToFunnelInAConnection() {
+        //Testing use case NIFI-9229
+        //Create Process Group
+        final ProcessGroup group = createProcessGroup("p-group-id", "P Group", 
getRootGroup());
+
+        //Create Processor under Process Group
+        final ProcessorNode processor = 
createProcessorNode(GenerateProcessor.class, group);
+
+        //Add Output Port to Process Group
+        final Port port = 
getFlowController().getFlowManager().createLocalOutputPort("output-port-id", 
"Output Port");
+        group.addOutputPort(port);
+
+        //Create Connection between Processor and Input Port
+        final Connection connection = connect(group, processor, port, 
processor.getRelationships());
+
+        //Create a snapshot
+        final VersionedFlowSnapshot version1 = createFlowSnapshot(group);
+
+        //Create Funnel under Process Group
+        Funnel funnel = 
getFlowController().getFlowManager().createFunnel("funnel-id");
+        group.addFunnel(funnel);
+
+        //Modify connection's destination from Output Port to Funnel
+        connection.setDestination(funnel);
+
+        //Delete Output Port
+        group.removeOutputPort(port);
+
+        //Create another snapshot
+        final VersionedFlowSnapshot version2 = createFlowSnapshot(group);
+
+        //Change Process Group version to Version 1
+        group.updateFlow(version1, null, false, true, true);
+
+        //Process Group should have only one Output Port, One Processor and 
One connection
+        assertEquals(1, group.getProcessors().size());
+        assertEquals(processor.getVersionedComponentId(), 
group.getProcessors().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(1, group.getConnections().size());
+        assertEquals(connection.getVersionedComponentId(), 
group.getConnections().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(1, group.getOutputPorts().size());
+        assertEquals(port.getVersionedComponentId(), 
group.getOutputPorts().stream().findFirst().get().getVersionedComponentId());
+        assertTrue(group.getFunnels().isEmpty());
+        assertEquals(connection.getDestination().getVersionedComponentId(), 
port.getVersionedComponentId());
+
+        //Change Process Group version to Version 2
+        group.updateFlow(version2, null, false, true, true);
+
+        //Process Group should have a Funnel, a Processor, a Connection and no 
Output Ports
+        assertTrue(group.getOutputPorts().isEmpty());
+        assertEquals(1, group.getProcessors().size());
+        assertEquals(processor.getVersionedComponentId(), 
group.getProcessors().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(1, group.getConnections().size());
+        assertEquals(connection.getVersionedComponentId(), 
group.getConnections().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(1, group.getFunnels().size());
+        assertEquals(funnel.getVersionedComponentId(), 
group.getFunnels().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(connection.getDestination().getVersionedComponentId(), 
funnel.getVersionedComponentId());
+    }
+
+    @Test
+    public void testUpdateFlowWithModifyingConnectionDeletingAndMovingPort() {
+        //Create Process Group A
+        final ProcessGroup groupA = createProcessGroup("group-a-id", "Group 
A", getRootGroup());
+
+        //Create Process Group B under Process Group A
+        final ProcessGroup groupB = createProcessGroup("group-b-id", "Group 
B", groupA);
+
+        //Add Input port under Process Group B
+        final Port inputPort = 
getFlowController().getFlowManager().createLocalInputPort("input-port-id", 
"Input Port");
+        groupB.addInputPort(inputPort);
+
+        //Add Processor 1 under Process Group A
+        final ProcessorNode processor1 = 
createProcessorNode(GenerateProcessor.class, groupA);
+
+        //Add Processor 2 under Process Group A
+        final ProcessorNode processor2 = 
createProcessorNode(GenerateProcessor.class, groupA);
+
+        //Add Output Port under Process Group A
+        final Port outputPort = 
getFlowController().getFlowManager().createLocalOutputPort("output-port-id", 
"Output Port");
+        groupA.addOutputPort(outputPort);
+
+        //Connect Processor 1 and Output Port as Connection 1
+        final Connection connection1 = connect(groupA, processor1, outputPort, 
processor1.getRelationships());
+
+        //Connect Processor 1 and Input Port as Connection 2
+        final Connection connection2 = connect(groupA, processor1, inputPort, 
processor1.getRelationships());
+
+        //Create a snapshot
+        final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA);
+
+        //Modify Connection 1 to point to Processor 2
+        connection1.setDestination(processor2);
+
+        //Move Output Port to Process Group B
+        moveOutputPort(outputPort, groupB);
+
+        //Create another snapshot
+        final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA);
+
+        //Delete connection 2
+        groupA.removeConnection(connection2);
+
+        //Delete Input Port
+        groupB.removeInputPort(inputPort);
+
+        //Create another snapshot
+        final VersionedFlowSnapshot version3 = createFlowSnapshot(groupA);
+
+        //Change Process Group version to Version 1
+        groupA.updateFlow(version1, null, false, true, true);
+
+        //Process Group A should have two Processors, 2 Connections, one 
Output Port and one Process Group with one Input Port
+        assertEquals(2, groupA.getProcessors().size());
+        assertEquals(2, groupA.getConnections().size());
+        assertEquals(connection1.getDestination().getVersionedComponentId(), 
outputPort.getVersionedComponentId());
+        assertEquals(1, groupA.getOutputPorts().size());
+        assertEquals(1, groupA.getProcessGroups().size());
+        assertEquals(1, groupB.getInputPorts().size());
+
+        //Change Process Group version to Version 2
+        groupA.updateFlow(version2, null, false, true, true);
+
+        //Connection1 destination changed to Processor2 and Output Port moved 
to Process Group B
+        assertTrue(groupA.getOutputPorts().isEmpty());
+        assertEquals(connection1.getDestination().getVersionedComponentId(), 
processor2.getVersionedComponentId());
+        assertEquals(1, groupB.getOutputPorts().size());
+        assertEquals(outputPort.getVersionedComponentId(), 
groupB.getOutputPorts().stream().findFirst().get().getVersionedComponentId());
+
+        //Change Process Group version to Version 3
+        groupA.updateFlow(version3, null, false, true, true);
+
+        //Connection2 and Input Port should be deleted
+        assertEquals(1, groupA.getConnections().size());
+        assertEquals(connection1.getVersionedComponentId(), 
groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
+        assertTrue(groupB.getInputPorts().isEmpty());
+    }
+
+    @Test
+    public void testUpdateFlowWithDeletingConnectionDeletingAndMovingPort() {
+        //Create Process Group A
+        final ProcessGroup groupA = createProcessGroup("group-a-id", "Group 
A", getRootGroup());
+
+        //Create Process Group B under Process Group A
+        final ProcessGroup groupB = createProcessGroup("group-b-id", "Group 
B", groupA);
+
+        //Add Input port under Process Group B
+        final Port inputPort = 
getFlowController().getFlowManager().createLocalInputPort("input-port-id", 
"Input Port");
+        groupB.addInputPort(inputPort);
+
+        //Add Processor 1 under Process Group A
+        final ProcessorNode processor1 = 
createProcessorNode(GenerateProcessor.class, groupA);
+
+        //Add Processor 2 under Process Group A
+        final ProcessorNode processor2 = 
createProcessorNode(GenerateProcessor.class, groupA);
+
+        //Add Output Port under Process Group A
+        final Port outputPort = 
getFlowController().getFlowManager().createLocalOutputPort("output-port-id", 
"Output Port");
+        groupA.addOutputPort(outputPort);
+
+        //Connect Processor 1 and Output Port as Connection 1
+        final Connection connection1 = connect(groupA, processor1, outputPort, 
processor1.getRelationships());
+
+        //Connect Processor 1 and Input Port as Connection 2
+        final Connection connection2 = connect(groupA, processor1, inputPort, 
processor1.getRelationships());
+
+        //Create a snapshot
+        final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA);
+
+        //Modify Connection 1 to point to Processor 2
+        connection1.setDestination(processor2);
+
+        //Delete Output Port
+        groupA.removeOutputPort(outputPort);
+
+        //Create another snapshot
+        final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA);
+
+        //Delete connection 2
+        groupA.removeConnection(connection2);
+
+        //Move Input Port to Process Group A
+        moveInputPort(inputPort, groupA);
+
+        //Create another snapshot
+        final VersionedFlowSnapshot version3 = createFlowSnapshot(groupA);
+
+        //Change Process Group version to Version 1
+        groupA.updateFlow(version1, null, false, true, true);
+
+        //Process Group A should have two Processors, 2 Connections, one 
Output Port and one Process Group with one Input Port
+        assertEquals(2, groupA.getProcessors().size());
+        assertEquals(2, groupA.getConnections().size());
+        assertEquals(connection1.getDestination().getVersionedComponentId(), 
outputPort.getVersionedComponentId());
+        assertEquals(1, groupA.getOutputPorts().size());
+        assertEquals(1, groupA.getProcessGroups().size());
+        assertEquals(1, groupB.getInputPorts().size());
+
+        //Change Process Group version to Version 2
+        groupA.updateFlow(version2, null, false, true, true);
+
+        //Connection1 destination changed to Processor2 and Output Port deleted
+        assertEquals(connection1.getDestination().getVersionedComponentId(), 
processor2.getVersionedComponentId());
+        assertTrue(groupA.getOutputPorts().isEmpty());
+        assertTrue(groupB.getOutputPorts().isEmpty());
+
+        //Change Process Group version to Version 3
+        groupA.updateFlow(version3, null, false, true, true);
+
+        //Connection2 should be deleted and Input Port moved to Process Group A
+        assertEquals(1, groupA.getConnections().size());
+        assertEquals(connection1.getVersionedComponentId(), 
groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
+        assertTrue(groupB.getInputPorts().isEmpty());
+        assertEquals(1, groupA.getInputPorts().size());
+        assertEquals(inputPort.getVersionedComponentId(), 
groupA.getInputPorts().stream().findFirst().get().getVersionedComponentId());
+    }
+
     private ProcessGroup createProcessGroup(final String groupId, final String 
groupName, final ProcessGroup destination) {
         final ProcessGroup group = 
getFlowController().getFlowManager().createProcessGroup(groupId);
         group.setName(groupName);
@@ -458,6 +676,14 @@ public class ImportFlowIT extends FrameworkIntegrationTest 
{
         port.getProcessGroup().move(snippet, destination);
     }
 
+    private void moveOutputPort(final Port port, final ProcessGroup 
destination) {
+        final StandardSnippet snippet = new StandardSnippet();
+        snippet.setParentGroupId(port.getProcessGroupIdentifier());
+        snippet.addOutputPorts(Collections.singletonMap(port.getIdentifier(), 
null));
+
+        port.getProcessGroup().move(snippet, destination);
+    }
+
 
     private Set<FlowDifference> getLocalModifications(final ProcessGroup 
processGroup, final VersionedFlowSnapshot versionedFlowSnapshot) {
         final NiFiRegistryFlowMapper mapper = new 
NiFiRegistryFlowMapper(getFlowController().getExtensionManager());
@@ -495,6 +721,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         final List<ProcessorNode> processorNodes;
         final List<ControllerServiceNode> controllerServiceNodes;
         final List<Port> inputPorts;
+        final List<Port> outputPorts;
+        final List<Funnel> funnels;
         final List<Connection> connections;
         final List<ProcessGroup> processGroups;
         final Set<VersionedProcessGroup> versionedProcessGroups;
@@ -503,12 +731,16 @@ public class ImportFlowIT extends 
FrameworkIntegrationTest {
             processorNodes = processors;
             controllerServiceNodes = controllerServices;
             inputPorts = Collections.EMPTY_LIST;
+            outputPorts = Collections.EMPTY_LIST;
+            funnels = Collections.EMPTY_LIST;
             connections = Collections.EMPTY_LIST;
             versionedProcessGroups = Collections.EMPTY_SET;
         } else {
             processorNodes = new ArrayList<>(group.getProcessors());
             controllerServiceNodes = new 
ArrayList<>(group.getControllerServices(false));
             inputPorts = new ArrayList<>(group.getInputPorts());
+            outputPorts = new ArrayList<>(group.getOutputPorts());
+            funnels = new ArrayList<>(group.getFunnels());
             connections = new ArrayList<>(group.getConnections());
             processGroups = new ArrayList<>(group.getProcessGroups());
 
@@ -541,6 +773,20 @@ public class ImportFlowIT extends FrameworkIntegrationTest 
{
             
inputPort.setVersionedComponentId(versionedInputPort.getIdentifier());
         }
 
+        final Set<VersionedPort> versionedOutputPorts = new HashSet<>();
+        for (final Port outputPort : outputPorts) {
+            final VersionedPort versionedOutputPort = 
flowMapper.mapPort(outputPort);
+            versionedOutputPorts.add(versionedOutputPort);
+            
outputPort.setVersionedComponentId(versionedOutputPort.getIdentifier());
+        }
+
+        final Set<VersionedFunnel> versionedFunnels = new HashSet<>();
+        for (final Funnel funnel : funnels) {
+            final VersionedFunnel versionedFunnel = 
flowMapper.mapFunnel(funnel);
+            versionedFunnels.add(versionedFunnel);
+            funnel.setVersionedComponentId(versionedFunnel.getIdentifier());
+        }
+
         final Set<VersionedConnection> versionedConnections = new HashSet<>();
         for (final Connection connection : connections) {
             final VersionedConnection versionedConnection = 
flowMapper.mapConnection(connection);
@@ -553,6 +799,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         flowContents.setControllerServices(versionedServices);
         flowContents.setProcessGroups(versionedProcessGroups);
         flowContents.setInputPorts(versionedInputPorts);
+        flowContents.setOutputPorts(versionedOutputPorts);
+        flowContents.setFunnels(versionedFunnels);
         flowContents.setConnections(versionedConnections);
 
         final VersionedFlowSnapshot versionedFlowSnapshot = 
createVersionedFlowSnapshot(snapshotMetadata, bucket, flow, flowContents);

Reply via email to