This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 451941aad9 NIFI-11251: Fixed case in which changing Flow Version was
improperly attempting to use a Temporary Funnel destination for a connection
due to differences in Group ID by instead checking if the destination is
reachable; fixed issue in which processors in an inner versioned group were not
stopped even though they changed, when a higher-level parent (that was itself
under version control) had its version changed. Wrote system tests to verify
451941aad9 is described below
commit 451941aad921f1c15bf65bad8bd58d8a1f7bb62e
Author: Mark Payne <[email protected]>
AuthorDate: Tue Mar 28 15:23:26 2023 -0400
NIFI-11251: Fixed case in which changing Flow Version was improperly
attempting to use a Temporary Funnel destination for a connection due to
differences in Group ID by instead checking if the destination is reachable;
fixed issue in which processors in an inner versioned group were not stopped
even though they changed, when a higher-level parent (that was itself under
version control) had its version changed. Wrote system tests to verify
- Fixed system tests so that they work properly in Clustered version of
RegistryClientIT
- Fixed system test - ensure that we wait for processors to become valid
before attempting to start them; also added an additional system test around
Controller Services in versioned flows
This closes #7095
Signed-off-by: David Handermann <[email protected]>
---
.../StandardVersionedComponentSynchronizer.java | 45 ++++---
.../apache/nifi/web/StandardNiFiServiceFacade.java | 2 +-
.../nifi/cs/tests/system/StandardCountService.java | 1 +
.../apache/nifi/tests/system/NiFiClientUtil.java | 36 ++++-
.../tests/system/registry/RegistryClientIT.java | 149 ++++++++++++++++++++-
5 files changed, 212 insertions(+), 21 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index a18d8e017a..957485ace5 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -680,18 +680,18 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
continue;
}
- // If the Connection's destination didn't change, nothing to do
+ // If the Connection's destination didn't change, and the new is
still reachable, nothing to do
final String destinationVersionId =
connection.getDestination().getVersionedComponentId().orElse(null);
final String proposedDestinationId =
proposedConnection.getDestination().getId();
- final String destinationGroupVersionId =
connection.getDestination().getProcessGroup().getVersionedComponentId().orElse(null);
- final String proposedDestinationGroupId =
proposedConnection.getDestination().getGroupId();
- if (Objects.equals(destinationVersionId, proposedDestinationId) &&
Objects.equals(destinationGroupVersionId, proposedDestinationGroupId)) {
+
+ Connectable newDestination = getConnectable(group,
proposedConnection.getDestination());
+ final boolean newDestinationReachableFromSource =
isConnectionDestinationReachable(connection.getSource(), newDestination);
+ if (Objects.equals(destinationVersionId, proposedDestinationId) &&
newDestinationReachableFromSource) {
continue;
}
// Find the destination of the connection. If the destination
doesn't yet exist (because it's part of the proposed Process Group but not yet
added),
// we will set the destination to a temporary destination. Then,
after adding components, we will update the destinations again.
- Connectable newDestination = getConnectable(group,
proposedConnection.getDestination());
final boolean useTempDestination =
isTempDestinationNecessary(connection, proposedConnection, newDestination);
if (useTempDestination) {
final Funnel temporaryDestination =
getTemporaryFunnel(connection.getProcessGroup());
@@ -706,6 +706,29 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
return connectionsWithTempDestination;
}
+ /**
+ * Checks if a Connection can be made from the given source component to
the given destination component
+ * @param source the source component
+ * @param destination the destination component
+ * @return true if the connection is allowable, <code>false</code> if the
connection cannot be made due to the Process Group hierarchies
+ */
+ private boolean isConnectionDestinationReachable(final Connectable source,
final Connectable destination) {
+ if (source == null || destination == null) {
+ return false;
+ }
+
+ // If the source is an Output Port, the destination must be in the
parent group, unless the destination is the Input Port of another group
+ if (source.getConnectableType() == ConnectableType.OUTPUT_PORT) {
+ if (destination.getConnectableType() ==
ConnectableType.INPUT_PORT) {
+ return Objects.equals(source.getProcessGroup().getParent(),
destination.getProcessGroup().getParent());
+ }
+
+ return Objects.equals(source.getProcessGroup().getParent(),
destination.getProcessGroup());
+ }
+
+ return Objects.equals(source.getProcessGroup(),
destination.getProcessGroup());
+ }
+
private boolean isTempDestinationNecessary(final Connection
existingConnection, final VersionedConnection proposedConnection, final
Connectable newDestination) {
if (newDestination == null) {
return true;
@@ -996,18 +1019,6 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
}
}
- private Set<Relationship> getAutoTerminatedRelationships(final
ProcessorNode processor, final VersionedProcessor proposedProcessor) {
- final Set<String> relationshipNames =
proposedProcessor.getAutoTerminatedRelationships();
- if (relationshipNames == null) {
- return Collections.emptySet();
- }
-
- return relationshipNames.stream()
- .map(processor::getRelationship)
- .filter(Objects::nonNull)
- .collect(Collectors.toSet());
- }
-
private void synchronizeRemoteGroups(final ProcessGroup group, final
VersionedProcessGroup proposed, final Map<String, RemoteProcessGroup>
rpgsByVersionedId) {
for (final VersionedRemoteProcessGroup proposedRpg :
proposed.getRemoteProcessGroups()) {
final RemoteProcessGroup rpg =
rpgsByVersionedId.get(proposedRpg.getIdentifier());
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 9cb0f94e50..c0f4a17c98 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -5447,7 +5447,7 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
final Set<String> ancestorServiceIds = group.getAncestorServiceIds();
final FlowComparator flowComparator = new
StandardFlowComparator(localFlow, proposedFlow, ancestorServiceIds, new
StaticDifferenceDescriptor(),
- Function.identity(), VersionedComponent::getIdentifier,
FlowComparatorVersionedStrategy.SHALLOW);
+ Function.identity(), VersionedComponent::getIdentifier,
FlowComparatorVersionedStrategy.DEEP);
final FlowComparison comparison = flowComparator.compare();
final FlowManager flowManager = controllerFacade.getFlowManager();
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardCountService.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardCountService.java
index 1a8c855039..b66bbd4213 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardCountService.java
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardCountService.java
@@ -54,6 +54,7 @@ public class StandardCountService extends
AbstractControllerService implements C
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
final long startValue =
Long.parseLong(context.getProperty(START_VALUE).getValue());
+ getLogger().info("Setting counter to {}", startValue);
counter.set(startValue);
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 0fb983a8f5..32ef406ca8 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -1597,9 +1597,16 @@ public class NiFiClientUtil {
public VersionControlInformationEntity startVersionControl(final
ProcessGroupEntity group, final FlowRegistryClientEntity registryClient, final
String bucketId, final String flowName)
throws NiFiClientException, IOException{
+ return publishFlowVersion(group, registryClient, bucketId, flowName,
null);
+ }
+
+ private VersionControlInformationEntity publishFlowVersion(final
ProcessGroupEntity group, final FlowRegistryClientEntity registryClient, final
String bucketId, final String flowName,
+ final String
flowId) throws NiFiClientException, IOException{
+
final VersionedFlowDTO versionedFlowDto = new VersionedFlowDTO();
versionedFlowDto.setBucketId(bucketId);
versionedFlowDto.setFlowName(flowName);
+ versionedFlowDto.setFlowId(flowId);
versionedFlowDto.setRegistryId(registryClient.getId());
versionedFlowDto.setAction(VersionedFlowDTO.COMMIT_ACTION);
@@ -1610,6 +1617,15 @@ public class NiFiClientUtil {
return
nifiClient.getVersionsClient().startVersionControl(group.getId(),
requestEntity);
}
+ public VersionControlInformationEntity saveFlowVersion(final
ProcessGroupEntity group, final FlowRegistryClientEntity registryClient, final
VersionControlInformationEntity currentVci)
+ throws NiFiClientException, IOException {
+
+ final VersionControlInformationDTO currentDto =
currentVci.getVersionControlInformation();
+ return publishFlowVersion(group, registryClient,
currentDto.getBucketId(), currentDto.getFlowName(), currentDto.getFlowId());
+ }
+
+
+
public VersionedFlowUpdateRequestEntity revertChanges(final
ProcessGroupEntity group) throws NiFiClientException, IOException,
InterruptedException {
final VersionControlInformationEntity vciEntity =
nifiClient.getVersionsClient().getVersionControlInfo(group.getId());
final VersionedFlowUpdateRequestEntity revertRequest =
nifiClient.getVersionsClient().initiateRevertFlowVersion(group.getId(),
vciEntity);
@@ -1655,6 +1671,12 @@ public class NiFiClientUtil {
}
public VersionedFlowUpdateRequestEntity changeFlowVersion(final String
processGroupId, final int version) throws NiFiClientException, IOException,
InterruptedException {
+ return changeFlowVersion(processGroupId, version, true);
+ }
+
+ public VersionedFlowUpdateRequestEntity changeFlowVersion(final String
processGroupId, final int version, final boolean throwOnFailure)
+ throws NiFiClientException, IOException, InterruptedException {
+
final ProcessGroupEntity groupEntity =
nifiClient.getProcessGroupClient().getProcessGroup(processGroupId);
final ProcessGroupDTO groupDto = groupEntity.getComponent();
final VersionControlInformationDTO vciDto =
groupDto.getVersionControlInformation();
@@ -1669,14 +1691,18 @@ public class NiFiClientUtil {
requestEntity.setVersionControlInformation(vciDto);
final VersionedFlowUpdateRequestEntity result =
nifiClient.getVersionsClient().updateVersionControlInfo(processGroupId,
requestEntity);
- return
waitForVersionFlowUpdateComplete(result.getRequest().getRequestId());
+ return
waitForVersionFlowUpdateComplete(result.getRequest().getRequestId(),
throwOnFailure);
}
- public VersionedFlowUpdateRequestEntity
waitForVersionFlowUpdateComplete(final String updateRequestId) throws
NiFiClientException, IOException, InterruptedException {
+ public VersionedFlowUpdateRequestEntity
waitForVersionFlowUpdateComplete(final String updateRequestId, final boolean
throwOnFailure) throws NiFiClientException, IOException, InterruptedException {
while (true) {
final VersionedFlowUpdateRequestEntity result =
nifiClient.getVersionsClient().getUpdateRequest(updateRequestId);
final boolean complete = result.getRequest().isComplete();
if (complete) {
+ if (throwOnFailure && result.getRequest().getFailureReason()
!= null) {
+ throw new RuntimeException("Version Flow Update request
failed due to: " + result.getRequest().getFailureReason());
+ }
+
return
nifiClient.getVersionsClient().deleteUpdateRequest(updateRequestId);
}
@@ -1739,4 +1765,10 @@ public class NiFiClientUtil {
return
nifiClient.getProcessGroupClient().copySnippet(destinationGroupId,
requestEntity);
}
+
+ public ConnectionEntity setFifoPrioritizer(final ConnectionEntity
connectionEntity) throws NiFiClientException, IOException {
+ final ConnectionDTO connectionDto = connectionEntity.getComponent();
+
connectionDto.setPrioritizers(Collections.singletonList("org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer"));
+ return
nifiClient.getConnectionClient().updateConnection(connectionEntity);
+ }
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
index 40bf8c32b7..195fe07fc5 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
@@ -18,6 +18,7 @@
package org.apache.nifi.tests.system.registry;
import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.tests.system.NiFiClientUtil;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
@@ -25,8 +26,11 @@ import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.FlowEntity;
import org.apache.nifi.web.api.entity.FlowRegistryClientEntity;
+import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
@@ -40,6 +44,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -52,6 +57,148 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class RegistryClientIT extends NiFiSystemIT {
+ /**
+ * Test a scenario where we have Parent Process Group with a child process
group. The child group is under Version Control.
+ * Then the parent is placed under Version Control. Then modify a
Processor in child. Register snapshot for child, then for parent.
+ * Then start Flow.
+ * Then change between versions at the Parent level while the flow is
stopped and while it's running.
+ */
+ @Test
+ public void testChangeVersionOnParentThatCascadesToChild() throws
NiFiClientException, IOException, InterruptedException {
+ final FlowRegistryClientEntity clientEntity = registerClient();
+ final NiFiClientUtil util = getClientUtil();
+
+ final ProcessGroupEntity parent = util.createProcessGroup("Parent",
"root");
+ final ProcessGroupEntity child = util.createProcessGroup("Child",
parent.getId());
+ final PortEntity inputPort = util.createInputPort("Input Port",
child.getId());
+ final PortEntity outputPort = util.createOutputPort("Output Port",
child.getId());
+ final ProcessorEntity updateContents =
util.createProcessor("UpdateContent", child.getId());
+ util.updateProcessorProperties(updateContents,
Collections.singletonMap("Content", "Updated"));
+
+ util.createConnection(inputPort, updateContents);
+ util.createConnection(updateContents, outputPort, "success");
+
+ final ProcessorEntity generate =
util.createProcessor("GenerateFlowFile", parent.getId());
+ util.updateProcessorProperties(generate,
Collections.singletonMap("Text", "Hello World"));
+ util.createConnection(generate, inputPort, "success");
+
+ final ProcessorEntity terminate =
util.createProcessor("TerminateFlowFile", parent.getId());
+ final ConnectionEntity connectionToTerminate =
util.createConnection(outputPort, terminate);
+
+ final VersionControlInformationEntity childVci =
util.startVersionControl(child, clientEntity,
"testChangeVersionOnParentThatCascadesToChild", "Child");
+ final VersionControlInformationEntity parentVci =
util.startVersionControl(parent, clientEntity,
"testChangeVersionOnParentThatCascadesToChild", "Parent");
+
+ // Change the properties of the UpdateContent processor and commit as
v2
+ util.updateProcessorProperties(updateContents,
Collections.singletonMap("Content", "Updated v2"));
+
+ util.saveFlowVersion(child, clientEntity, childVci);
+ util.saveFlowVersion(parent, clientEntity, parentVci);
+
+ // Ensure that we have the correct state
+ util.assertFlowUpToDate(parent.getId());
+ util.assertFlowUpToDate(child.getId());
+
+ // Verify that we are able to switch back to v1 while everything is
stopped
+ util.changeFlowVersion(parent.getId(), 1);
+ util.assertFlowStaleAndUnmodified(parent.getId());
+ util.assertFlowStaleAndUnmodified(child.getId());
+
+ // Start the flow and verify the contents of the flow file
+ util.waitForValidProcessor(updateContents.getId());
+ util.waitForValidProcessor(generate.getId());
+ util.startProcessGroupComponents(child.getId());
+ util.startProcessor(generate);
+
+ waitForQueueCount(connectionToTerminate.getId(), getNumberOfNodes());
+
+ final String contents =
util.getFlowFileContentAsUtf8(connectionToTerminate.getId(), 0);
+ assertEquals("Updated", contents);
+
+ // Switch Version back to v2 while it's running
+ util.changeFlowVersion(parent.getId(), 2);
+ util.assertFlowUpToDate(parent.getId());
+ util.assertFlowUpToDate(child.getId());
+
+ // With flow running, change version to v1. Restart GenerateFlowFile
to trigger another FlowFile to be generated
+ util.stopProcessor(generate);
+ util.startProcessor(generate);
+ waitForQueueCount(connectionToTerminate.getId(), 2 *
getNumberOfNodes());
+
+ // Ensure that the contents are correct
+ final String secondFlowFileContents =
util.getFlowFileContentAsUtf8(connectionToTerminate.getId(),
getNumberOfNodes());
+ assertEquals("Updated v2", secondFlowFileContents);
+
+ // Switch back to v1 while flow is running to verify that the version
can change back to a lower version as well
+ util.changeFlowVersion(parent.getId(), 1);
+ util.assertFlowStaleAndUnmodified(parent.getId());
+ util.assertFlowStaleAndUnmodified(child.getId());
+
+ util.stopProcessor(generate);
+ util.startProcessor(generate);
+ waitForQueueCount(connectionToTerminate.getId(), 3 *
getNumberOfNodes());
+
+ final String thirdFlowFileContents =
util.getFlowFileContentAsUtf8(connectionToTerminate.getId(), getNumberOfNodes()
* 2);
+ assertEquals("Updated", thirdFlowFileContents);
+ }
+
+
+ @Test
+ public void testControllerServiceUpdateWhileRunning() throws
NiFiClientException, IOException, InterruptedException {
+ final FlowRegistryClientEntity clientEntity = registerClient();
+ final NiFiClientUtil util = getClientUtil();
+
+ final ProcessGroupEntity group = util.createProcessGroup("Parent",
"root");
+ final ControllerServiceEntity service =
util.createControllerService("StandardCountService", group.getId());
+
+ final ProcessorEntity generate =
util.createProcessor("GenerateFlowFile", group.getId());
+ final ProcessorEntity countProcessor =
util.createProcessor("CountFlowFiles", group.getId());
+ util.updateProcessorProperties(countProcessor,
Collections.singletonMap("Count Service", service.getComponent().getId()));
+
+ final ProcessorEntity terminate =
util.createProcessor("TerminateFlowFile", group.getId());
+ final ConnectionEntity connectionToTerminate =
util.createConnection(countProcessor, terminate, "success");
+ util.setFifoPrioritizer(connectionToTerminate);
+ util.createConnection(generate, countProcessor, "success");
+
+ // Save the flow as v1
+ final VersionControlInformationEntity vci =
util.startVersionControl(group, clientEntity,
"testControllerServiceUpdateWhileRunning", "Parent");
+
+ // Change the value of of the Controller Service's start value to
2000, and change the text of the GenerateFlowFile just to make it run each time
the version is changed
+ util.updateControllerServiceProperties(service,
Collections.singletonMap("Start Value", "2000"));
+ util.updateProcessorProperties(generate,
Collections.singletonMap("Text", "Hello World"));
+
+ // Save the flow as v2
+ util.saveFlowVersion(group, clientEntity, vci);
+
+ // Change back to v1 and start the flow
+ util.changeFlowVersion(group.getId(), 1);
+ util.assertFlowStaleAndUnmodified(group.getId());
+ util.enableControllerService(service);
+
+ util.waitForValidProcessor(generate.getId());
+ util.startProcessor(generate);
+ util.waitForValidProcessor(countProcessor.getId());
+ util.startProcessor(countProcessor);
+
+ // Ensure that we get the expected result
+ waitForQueueCount(connectionToTerminate.getId(), getNumberOfNodes());
+ final Map<String, String> firstFlowFileAttributes =
util.getQueueFlowFile(connectionToTerminate.getId(),
0).getFlowFile().getAttributes();
+ assertEquals("1", firstFlowFileAttributes.get("count"));
+
+ // Change to v2 and ensure that the output is correct
+ util.changeFlowVersion(group.getId(), 2);
+ util.assertFlowUpToDate(group.getId());
+ waitForQueueCount(connectionToTerminate.getId(), 2 *
getNumberOfNodes());
+ final Map<String, String> secondFlowFileAttributes =
util.getQueueFlowFile(connectionToTerminate.getId(),
getNumberOfNodes()).getFlowFile().getAttributes();
+ assertEquals("2001", secondFlowFileAttributes.get("count"));
+
+ // Change back to v1 and ensure that the output is correct. It should
reset count back to 0.
+ util.changeFlowVersion(group.getId(), 1);
+ util.assertFlowStaleAndUnmodified(group.getId());
+ waitForQueueCount(connectionToTerminate.getId(), 3 *
getNumberOfNodes());
+ final Map<String, String> thirdFlowFileAttributes =
util.getQueueFlowFile(connectionToTerminate.getId(), getNumberOfNodes() *
2).getFlowFile().getAttributes();
+ assertEquals("1", thirdFlowFileAttributes.get("count"));
+ }
+
@Test
public void testChangeVersionWithPortMoveBetweenGroups() throws
NiFiClientException, IOException, InterruptedException {
final FlowRegistryClientEntity clientEntity = registerClient(new
File("src/test/resources/versioned-flows"));
@@ -107,7 +254,7 @@ public class RegistryClientIT extends NiFiSystemIT {
assertNotNull(imported);
getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
- final VersionedFlowUpdateRequestEntity version2Result =
getClientUtil().changeFlowVersion(imported.getId(), 2);
+ final VersionedFlowUpdateRequestEntity version2Result =
getClientUtil().changeFlowVersion(imported.getId(), 2, false);
final String failureReason =
version2Result.getRequest().getFailureReason();
assertNotNull(failureReason);