This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 23d6d6ede4 NIFI-11251: Fixed case in which changing Flow Version was 
improperly attempting to use a Temporary Funnel destination for a connection 
due to differences in Group ID by instead checking if the destination is 
reachable; fixed issue in which processors in an inner versioned group were not 
stopped even though they changed, when a higher-level parent (that was itself 
under version control) had its version changed. Wrote system tests to verify
23d6d6ede4 is described below

commit 23d6d6ede4f3d854e67bada3daddf77bfc7db57d
Author: Mark Payne <[email protected]>
AuthorDate: Tue Mar 28 15:23:26 2023 -0400

    NIFI-11251: Fixed case in which changing Flow Version was improperly 
attempting to use a Temporary Funnel destination for a connection due to 
differences in Group ID by instead checking if the destination is reachable; 
fixed issue in which processors in an inner versioned group were not stopped 
even though they changed, when a higher-level parent (that was itself under 
version control) had its version changed. Wrote system tests to verify
    
    - Fixed system tests so that they work properly in Clustered version of 
RegistryClientIT
    - Fixed system test - ensure that we wait for processors to become valid 
before attempting to start them; also added an additional system test around 
Controller Services in versioned flows
    
    This closes #7095
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../StandardVersionedComponentSynchronizer.java    |  45 ++++---
 .../apache/nifi/web/StandardNiFiServiceFacade.java |   2 +-
 .../nifi/cs/tests/system/StandardCountService.java |   1 +
 .../apache/nifi/tests/system/NiFiClientUtil.java   |  36 ++++-
 .../tests/system/registry/RegistryClientIT.java    | 149 ++++++++++++++++++++-
 5 files changed, 212 insertions(+), 21 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index a18d8e017a..957485ace5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -680,18 +680,18 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
                 continue;
             }
 
-            // If the Connection's destination didn't change, nothing to do
+            // If the Connection's destination didn't change, and the new is 
still reachable, nothing to do
             final String destinationVersionId = 
connection.getDestination().getVersionedComponentId().orElse(null);
             final String proposedDestinationId = 
proposedConnection.getDestination().getId();
-            final String destinationGroupVersionId = 
connection.getDestination().getProcessGroup().getVersionedComponentId().orElse(null);
-            final String proposedDestinationGroupId = 
proposedConnection.getDestination().getGroupId();
-            if (Objects.equals(destinationVersionId, proposedDestinationId) && 
Objects.equals(destinationGroupVersionId, proposedDestinationGroupId)) {
+
+            Connectable newDestination = getConnectable(group, 
proposedConnection.getDestination());
+            final boolean newDestinationReachableFromSource = 
isConnectionDestinationReachable(connection.getSource(), newDestination);
+            if (Objects.equals(destinationVersionId, proposedDestinationId) && 
newDestinationReachableFromSource) {
                 continue;
             }
 
             // Find the destination of the connection. If the destination 
doesn't yet exist (because it's part of the proposed Process Group but not yet 
added),
             // we will set the destination to a temporary destination. Then, 
after adding components, we will update the destinations again.
-            Connectable newDestination = getConnectable(group, 
proposedConnection.getDestination());
             final boolean useTempDestination = 
isTempDestinationNecessary(connection, proposedConnection, newDestination);
             if (useTempDestination) {
                 final Funnel temporaryDestination = 
getTemporaryFunnel(connection.getProcessGroup());
@@ -706,6 +706,29 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
         return connectionsWithTempDestination;
     }
 
+    /**
+     * Checks if a Connection can be made from the given source component to 
the given destination component
+     * @param source the source component
+     * @param destination the destination component
+     * @return true if the connection is allowable, <code>false</code> if the 
connection cannot be made due to the Process Group hierarchies
+     */
+    private boolean isConnectionDestinationReachable(final Connectable source, 
final Connectable destination) {
+        if (source == null || destination == null) {
+            return false;
+        }
+
+        // If the source is an Output Port, the destination must be in the 
parent group, unless the destination is the Input Port of another group
+        if (source.getConnectableType() == ConnectableType.OUTPUT_PORT) {
+            if (destination.getConnectableType() == 
ConnectableType.INPUT_PORT) {
+                return Objects.equals(source.getProcessGroup().getParent(), 
destination.getProcessGroup().getParent());
+            }
+
+            return Objects.equals(source.getProcessGroup().getParent(), 
destination.getProcessGroup());
+        }
+
+        return Objects.equals(source.getProcessGroup(), 
destination.getProcessGroup());
+    }
+
     private boolean isTempDestinationNecessary(final Connection 
existingConnection, final VersionedConnection proposedConnection, final 
Connectable newDestination) {
         if (newDestination == null) {
             return true;
@@ -996,18 +1019,6 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
         }
     }
 
-    private Set<Relationship> getAutoTerminatedRelationships(final 
ProcessorNode processor, final VersionedProcessor proposedProcessor) {
-        final Set<String> relationshipNames = 
proposedProcessor.getAutoTerminatedRelationships();
-        if (relationshipNames == null) {
-            return Collections.emptySet();
-        }
-
-        return relationshipNames.stream()
-            .map(processor::getRelationship)
-            .filter(Objects::nonNull)
-            .collect(Collectors.toSet());
-    }
-
     private void synchronizeRemoteGroups(final ProcessGroup group, final 
VersionedProcessGroup proposed, final Map<String, RemoteProcessGroup> 
rpgsByVersionedId) {
         for (final VersionedRemoteProcessGroup proposedRpg : 
proposed.getRemoteProcessGroups()) {
             final RemoteProcessGroup rpg = 
rpgsByVersionedId.get(proposedRpg.getIdentifier());
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 97819083bd..6b25a98a18 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -5447,7 +5447,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
         final Set<String> ancestorServiceIds = group.getAncestorServiceIds();
         final FlowComparator flowComparator = new 
StandardFlowComparator(localFlow, proposedFlow, ancestorServiceIds, new 
StaticDifferenceDescriptor(),
-            Function.identity(), VersionedComponent::getIdentifier, 
FlowComparatorVersionedStrategy.SHALLOW);
+            Function.identity(), VersionedComponent::getIdentifier, 
FlowComparatorVersionedStrategy.DEEP);
         final FlowComparison comparison = flowComparator.compare();
 
         final FlowManager flowManager = controllerFacade.getFlowManager();
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardCountService.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardCountService.java
index 1a8c855039..b66bbd4213 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardCountService.java
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardCountService.java
@@ -54,6 +54,7 @@ public class StandardCountService extends 
AbstractControllerService implements C
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) {
         final long startValue = 
Long.parseLong(context.getProperty(START_VALUE).getValue());
+        getLogger().info("Setting counter to {}", startValue);
         counter.set(startValue);
     }
 
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 0fb983a8f5..32ef406ca8 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -1597,9 +1597,16 @@ public class NiFiClientUtil {
     public VersionControlInformationEntity startVersionControl(final 
ProcessGroupEntity group, final FlowRegistryClientEntity registryClient, final 
String bucketId, final String flowName)
             throws NiFiClientException, IOException{
 
+        return publishFlowVersion(group, registryClient, bucketId, flowName, 
null);
+    }
+
+    private VersionControlInformationEntity publishFlowVersion(final 
ProcessGroupEntity group, final FlowRegistryClientEntity registryClient, final 
String bucketId, final String flowName,
+                                                               final String 
flowId) throws NiFiClientException, IOException{
+
         final VersionedFlowDTO versionedFlowDto = new VersionedFlowDTO();
         versionedFlowDto.setBucketId(bucketId);
         versionedFlowDto.setFlowName(flowName);
+        versionedFlowDto.setFlowId(flowId);
         versionedFlowDto.setRegistryId(registryClient.getId());
         versionedFlowDto.setAction(VersionedFlowDTO.COMMIT_ACTION);
 
@@ -1610,6 +1617,15 @@ public class NiFiClientUtil {
         return 
nifiClient.getVersionsClient().startVersionControl(group.getId(), 
requestEntity);
     }
 
+    public VersionControlInformationEntity saveFlowVersion(final 
ProcessGroupEntity group, final FlowRegistryClientEntity registryClient, final 
VersionControlInformationEntity currentVci)
+            throws NiFiClientException, IOException {
+
+        final VersionControlInformationDTO currentDto = 
currentVci.getVersionControlInformation();
+        return publishFlowVersion(group, registryClient, 
currentDto.getBucketId(), currentDto.getFlowName(), currentDto.getFlowId());
+    }
+
+
+
     public VersionedFlowUpdateRequestEntity revertChanges(final 
ProcessGroupEntity group) throws NiFiClientException, IOException, 
InterruptedException {
         final VersionControlInformationEntity vciEntity = 
nifiClient.getVersionsClient().getVersionControlInfo(group.getId());
         final VersionedFlowUpdateRequestEntity revertRequest = 
nifiClient.getVersionsClient().initiateRevertFlowVersion(group.getId(), 
vciEntity);
@@ -1655,6 +1671,12 @@ public class NiFiClientUtil {
     }
 
     public VersionedFlowUpdateRequestEntity changeFlowVersion(final String 
processGroupId, final int version) throws NiFiClientException, IOException, 
InterruptedException {
+        return changeFlowVersion(processGroupId, version, true);
+    }
+
+    public VersionedFlowUpdateRequestEntity changeFlowVersion(final String 
processGroupId, final int version, final boolean throwOnFailure)
+                throws NiFiClientException, IOException, InterruptedException {
+
         final ProcessGroupEntity groupEntity = 
nifiClient.getProcessGroupClient().getProcessGroup(processGroupId);
         final ProcessGroupDTO groupDto = groupEntity.getComponent();
         final VersionControlInformationDTO vciDto = 
groupDto.getVersionControlInformation();
@@ -1669,14 +1691,18 @@ public class NiFiClientUtil {
         requestEntity.setVersionControlInformation(vciDto);
 
         final VersionedFlowUpdateRequestEntity result = 
nifiClient.getVersionsClient().updateVersionControlInfo(processGroupId, 
requestEntity);
-        return 
waitForVersionFlowUpdateComplete(result.getRequest().getRequestId());
+        return 
waitForVersionFlowUpdateComplete(result.getRequest().getRequestId(), 
throwOnFailure);
     }
 
-    public VersionedFlowUpdateRequestEntity 
waitForVersionFlowUpdateComplete(final String updateRequestId) throws 
NiFiClientException, IOException, InterruptedException {
+    public VersionedFlowUpdateRequestEntity 
waitForVersionFlowUpdateComplete(final String updateRequestId, final boolean 
throwOnFailure) throws NiFiClientException, IOException, InterruptedException {
         while (true) {
             final VersionedFlowUpdateRequestEntity result = 
nifiClient.getVersionsClient().getUpdateRequest(updateRequestId);
             final boolean complete = result.getRequest().isComplete();
             if (complete) {
+                if (throwOnFailure && result.getRequest().getFailureReason() 
!= null) {
+                    throw new RuntimeException("Version Flow Update request 
failed due to: " + result.getRequest().getFailureReason());
+                }
+
                 return 
nifiClient.getVersionsClient().deleteUpdateRequest(updateRequestId);
             }
 
@@ -1739,4 +1765,10 @@ public class NiFiClientUtil {
 
         return 
nifiClient.getProcessGroupClient().copySnippet(destinationGroupId, 
requestEntity);
     }
+
+    public ConnectionEntity setFifoPrioritizer(final ConnectionEntity 
connectionEntity) throws NiFiClientException, IOException {
+        final ConnectionDTO connectionDto = connectionEntity.getComponent();
+        
connectionDto.setPrioritizers(Collections.singletonList("org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer"));
+        return 
nifiClient.getConnectionClient().updateConnection(connectionEntity);
+    }
 }
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
index 40bf8c32b7..195fe07fc5 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.tests.system.registry;
 
 import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.tests.system.NiFiClientUtil;
 import org.apache.nifi.tests.system.NiFiSystemIT;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
@@ -25,8 +26,11 @@ import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
 import org.apache.nifi.web.api.dto.flow.FlowDTO;
 import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.FlowEntity;
 import org.apache.nifi.web.api.entity.FlowRegistryClientEntity;
+import org.apache.nifi.web.api.entity.PortEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
@@ -40,6 +44,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
@@ -52,6 +57,148 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class RegistryClientIT extends NiFiSystemIT {
 
+    /**
+     * Test a scenario where we have Parent Process Group with a child process 
group. The child group is under Version Control.
+     * Then the parent is placed under Version Control. Then modify a 
Processor in child. Register snapshot for child, then for parent.
+     * Then start Flow.
+     * Then change between versions at the Parent level while the flow is 
stopped and while it's running.
+     */
+    @Test
+    public void testChangeVersionOnParentThatCascadesToChild() throws 
NiFiClientException, IOException, InterruptedException {
+        final FlowRegistryClientEntity clientEntity = registerClient();
+        final NiFiClientUtil util = getClientUtil();
+
+        final ProcessGroupEntity parent = util.createProcessGroup("Parent", 
"root");
+        final ProcessGroupEntity child = util.createProcessGroup("Child", 
parent.getId());
+        final PortEntity inputPort = util.createInputPort("Input Port", 
child.getId());
+        final PortEntity outputPort = util.createOutputPort("Output Port", 
child.getId());
+        final ProcessorEntity updateContents = 
util.createProcessor("UpdateContent", child.getId());
+        util.updateProcessorProperties(updateContents, 
Collections.singletonMap("Content", "Updated"));
+
+        util.createConnection(inputPort, updateContents);
+        util.createConnection(updateContents, outputPort, "success");
+
+        final ProcessorEntity generate = 
util.createProcessor("GenerateFlowFile", parent.getId());
+        util.updateProcessorProperties(generate, 
Collections.singletonMap("Text", "Hello World"));
+        util.createConnection(generate, inputPort, "success");
+
+        final ProcessorEntity terminate = 
util.createProcessor("TerminateFlowFile", parent.getId());
+        final ConnectionEntity connectionToTerminate = 
util.createConnection(outputPort, terminate);
+
+        final VersionControlInformationEntity childVci = 
util.startVersionControl(child, clientEntity, 
"testChangeVersionOnParentThatCascadesToChild", "Child");
+        final VersionControlInformationEntity parentVci = 
util.startVersionControl(parent, clientEntity, 
"testChangeVersionOnParentThatCascadesToChild", "Parent");
+
+        // Change the properties of the UpdateContent processor and commit as 
v2
+        util.updateProcessorProperties(updateContents, 
Collections.singletonMap("Content", "Updated v2"));
+
+        util.saveFlowVersion(child, clientEntity, childVci);
+        util.saveFlowVersion(parent, clientEntity, parentVci);
+
+        // Ensure that we have the correct state
+        util.assertFlowUpToDate(parent.getId());
+        util.assertFlowUpToDate(child.getId());
+
+        // Verify that we are able to switch back to v1 while everything is 
stopped
+        util.changeFlowVersion(parent.getId(), 1);
+        util.assertFlowStaleAndUnmodified(parent.getId());
+        util.assertFlowStaleAndUnmodified(child.getId());
+
+        // Start the flow and verify the contents of the flow file
+        util.waitForValidProcessor(updateContents.getId());
+        util.waitForValidProcessor(generate.getId());
+        util.startProcessGroupComponents(child.getId());
+        util.startProcessor(generate);
+
+        waitForQueueCount(connectionToTerminate.getId(), getNumberOfNodes());
+
+        final String contents = 
util.getFlowFileContentAsUtf8(connectionToTerminate.getId(), 0);
+        assertEquals("Updated", contents);
+
+        // Switch Version back to v2 while it's running
+        util.changeFlowVersion(parent.getId(), 2);
+        util.assertFlowUpToDate(parent.getId());
+        util.assertFlowUpToDate(child.getId());
+
+        // With flow running, change version to v1. Restart GenerateFlowFile 
to trigger another FlowFile to be generated
+        util.stopProcessor(generate);
+        util.startProcessor(generate);
+        waitForQueueCount(connectionToTerminate.getId(), 2 * 
getNumberOfNodes());
+
+        // Ensure that the contents are correct
+        final String secondFlowFileContents = 
util.getFlowFileContentAsUtf8(connectionToTerminate.getId(), 
getNumberOfNodes());
+        assertEquals("Updated v2", secondFlowFileContents);
+
+        // Switch back to v1 while flow is running to verify that the version 
can change back to a lower version as well
+        util.changeFlowVersion(parent.getId(), 1);
+        util.assertFlowStaleAndUnmodified(parent.getId());
+        util.assertFlowStaleAndUnmodified(child.getId());
+
+        util.stopProcessor(generate);
+        util.startProcessor(generate);
+        waitForQueueCount(connectionToTerminate.getId(), 3 * 
getNumberOfNodes());
+
+        final String thirdFlowFileContents = 
util.getFlowFileContentAsUtf8(connectionToTerminate.getId(), getNumberOfNodes() 
* 2);
+        assertEquals("Updated", thirdFlowFileContents);
+    }
+
+
+    @Test
+    public void testControllerServiceUpdateWhileRunning() throws 
NiFiClientException, IOException, InterruptedException {
+        final FlowRegistryClientEntity clientEntity = registerClient();
+        final NiFiClientUtil util = getClientUtil();
+
+        final ProcessGroupEntity group = util.createProcessGroup("Parent", 
"root");
+        final ControllerServiceEntity service = 
util.createControllerService("StandardCountService", group.getId());
+
+        final ProcessorEntity generate = 
util.createProcessor("GenerateFlowFile", group.getId());
+        final ProcessorEntity countProcessor = 
util.createProcessor("CountFlowFiles", group.getId());
+        util.updateProcessorProperties(countProcessor, 
Collections.singletonMap("Count Service", service.getComponent().getId()));
+
+        final ProcessorEntity terminate = 
util.createProcessor("TerminateFlowFile", group.getId());
+        final ConnectionEntity connectionToTerminate = 
util.createConnection(countProcessor, terminate, "success");
+        util.setFifoPrioritizer(connectionToTerminate);
+        util.createConnection(generate, countProcessor, "success");
+
+        // Save the flow as v1
+        final VersionControlInformationEntity vci = 
util.startVersionControl(group, clientEntity, 
"testControllerServiceUpdateWhileRunning", "Parent");
+
+        // Change the value of of the Controller Service's start value to 
2000, and change the text of the GenerateFlowFile just to make it run each time 
the version is changed
+        util.updateControllerServiceProperties(service, 
Collections.singletonMap("Start Value", "2000"));
+        util.updateProcessorProperties(generate, 
Collections.singletonMap("Text", "Hello World"));
+
+        // Save the flow as v2
+        util.saveFlowVersion(group, clientEntity, vci);
+
+        // Change back to v1 and start the flow
+        util.changeFlowVersion(group.getId(), 1);
+        util.assertFlowStaleAndUnmodified(group.getId());
+        util.enableControllerService(service);
+
+        util.waitForValidProcessor(generate.getId());
+        util.startProcessor(generate);
+        util.waitForValidProcessor(countProcessor.getId());
+        util.startProcessor(countProcessor);
+
+        // Ensure that we get the expected result
+        waitForQueueCount(connectionToTerminate.getId(), getNumberOfNodes());
+        final Map<String, String> firstFlowFileAttributes = 
util.getQueueFlowFile(connectionToTerminate.getId(), 
0).getFlowFile().getAttributes();
+        assertEquals("1", firstFlowFileAttributes.get("count"));
+
+        // Change to v2 and ensure that the output is correct
+        util.changeFlowVersion(group.getId(), 2);
+        util.assertFlowUpToDate(group.getId());
+        waitForQueueCount(connectionToTerminate.getId(), 2 * 
getNumberOfNodes());
+        final Map<String, String> secondFlowFileAttributes = 
util.getQueueFlowFile(connectionToTerminate.getId(), 
getNumberOfNodes()).getFlowFile().getAttributes();
+        assertEquals("2001", secondFlowFileAttributes.get("count"));
+
+        // Change back to v1 and ensure that the output is correct. It should 
reset count back to 0.
+        util.changeFlowVersion(group.getId(), 1);
+        util.assertFlowStaleAndUnmodified(group.getId());
+        waitForQueueCount(connectionToTerminate.getId(), 3 * 
getNumberOfNodes());
+        final Map<String, String> thirdFlowFileAttributes = 
util.getQueueFlowFile(connectionToTerminate.getId(), getNumberOfNodes() * 
2).getFlowFile().getAttributes();
+        assertEquals("1", thirdFlowFileAttributes.get("count"));
+    }
+
     @Test
     public void testChangeVersionWithPortMoveBetweenGroups() throws 
NiFiClientException, IOException, InterruptedException {
         final FlowRegistryClientEntity clientEntity = registerClient(new 
File("src/test/resources/versioned-flows"));
@@ -107,7 +254,7 @@ public class RegistryClientIT extends NiFiSystemIT {
         assertNotNull(imported);
         getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
 
-        final VersionedFlowUpdateRequestEntity version2Result = 
getClientUtil().changeFlowVersion(imported.getId(), 2);
+        final VersionedFlowUpdateRequestEntity version2Result = 
getClientUtil().changeFlowVersion(imported.getId(), 2, false);
         final String failureReason = 
version2Result.getRequest().getFailureReason();
         assertNotNull(failureReason);
 

Reply via email to