adenes commented on a change in pull request #5286:
URL: https://github.com/apache/nifi/pull/5286#discussion_r686612419
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
##########
@@ -384,7 +391,67 @@ public void
testChangeVersionFromExplicitValueToParameterSensitiveProperty() {
assertEquals("#{secret-param}",
nodeInGroupWithNoValue.getProperty(UsernamePasswordProcessor.PASSWORD).getRawValue());
}
+ @Test
+ public void testUpdateFlowWithInputPortMovedFromGroupAToGroupB() {
+ //Testing use case NIFI-9018
+ //Create Process Group A
+ final ProcessGroup groupA = createProcessGroup("group-a-id", "Group
A", getRootGroup());
+
+ //Add Input Port to Process Group A
+ final Port port =
getFlowController().getFlowManager().createLocalInputPort("input-port-id",
"Input Port");
+ groupA.addInputPort(port);
+
+ //Create a snapshot
+ final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA);
+
+ //Create Process Group B under Process Group A
+ final ProcessGroup groupB = createProcessGroup("group-b-id", "Group
B", groupA);
+
+ //Move Input Port from Process Group A to Process Group B
+ moveInputPort(port, groupB);
+
+ //Create Processor under Process Group A
+ final ProcessorNode processor =
createProcessorNode(GenerateProcessor.class, groupA);
+
+ //Create Connection between Processor in Process Group A and Input
Port in Process Group B
+ final Connection connection = connect(groupA, processor, port,
processor.getRelationships());
+ //Create another snapshot
+ final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA);
+
+ //Change Process Group A version to Version 1
+ groupA.updateFlow(version1, null, false, true, true);
+
+ //Process Group A should have only one Input Port and no Process
Groups, Processors or Connections
+ assertTrue(groupA.getProcessGroups().isEmpty());
+ assertTrue(groupA.getProcessors().isEmpty());
+ assertTrue(groupA.getConnections().isEmpty());
+ assertEquals(1L, groupA.getInputPorts().stream().filter(p ->
p.getVersionedComponentId().get().equals(port.getVersionedComponentId().get())).count());
+
+ //Change Process Group A version to Version 2
+ groupA.updateFlow(version2, null, false, true, true);
+
+ //Process Group A should have a Process Group, a Processor and a
Connection and no Input Ports
+ assertEquals(1L, groupA.getProcessGroups().stream().filter(pg ->
pg.getVersionedComponentId().get().equals(groupB.getVersionedComponentId().get())).count());
+ assertEquals(1L, groupA.getProcessors().stream().filter(pr ->
pr.getVersionedComponentId().get().equals(processor.getVersionedComponentId().get())).count());
+ assertEquals(1L, groupA.getConnections().stream().filter(c ->
c.getVersionedComponentId().get().equals(connection.getVersionedComponentId().get())).count());
Review comment:
Same as above: this could be simplify to improve on readability.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
##########
@@ -408,59 +475,82 @@ public void
testChangeVersionFromExplicitValueToParameterSensitiveProperty() {
return differences;
}
- private VersionedFlowSnapshot createFlowSnapshot(final
List<ControllerServiceNode> controllerServices, final List<ProcessorNode>
processors, final Set<Parameter> parameters) {
- final VersionedFlowSnapshotMetadata snapshotMetadata = new
VersionedFlowSnapshotMetadata();
- snapshotMetadata.setAuthor("unit-test");
- snapshotMetadata.setBucketIdentifier("unit-test-bucket");
- snapshotMetadata.setFlowIdentifier("unit-test-flow");
- snapshotMetadata.setTimestamp(System.currentTimeMillis());
- snapshotMetadata.setVersion(1);
+ private VersionedFlowSnapshot createFlowSnapshot(final ProcessGroup group,
final List<ControllerServiceNode> controllerServices,
+ final List<ProcessorNode>
processors, final Set<Parameter> parameters) {
+ final VersionedFlowSnapshotMetadata snapshotMetadata =
createSnapshotMetadata();
- final Bucket bucket = new Bucket();
- bucket.setCreatedTimestamp(System.currentTimeMillis());
- bucket.setIdentifier("unit-test-bucket");
- bucket.setName("Unit Test Bucket");
+ final Bucket bucket = createBucket();
- final VersionedFlow flow = new VersionedFlow();
- flow.setBucketIdentifier("unit-test-bucket");
- flow.setBucketName("Unit Test Bucket");
- flow.setCreatedTimestamp(System.currentTimeMillis());
- flow.setIdentifier("unit-test-flow");
- flow.setName("Unit Test Flow");
+ final VersionedFlow flow = createVersionedFlow();
- final BundleCoordinate coordinate =
getSystemBundle().getBundleDetails().getCoordinate();
- final Bundle bundle = new Bundle();
- bundle.setArtifact(coordinate.getId());
- bundle.setGroup(coordinate.getGroup());
- bundle.setVersion(coordinate.getVersion());
+ createBundle();
final NiFiRegistryFlowMapper flowMapper = new
NiFiRegistryFlowMapper(getExtensionManager());
+ final List<ProcessorNode> processorNodes;
+ final List<ControllerServiceNode> controllerServiceNodes;
+ final List<Port> inputPorts;
+ final List<Connection> connections;
+ final List<ProcessGroup> processGroups;
+ final Set<VersionedProcessGroup> versionedProcessGroups;
+
+ if (group == null) {
+ processorNodes = processors;
+ controllerServiceNodes = controllerServices;
+ inputPorts = Collections.EMPTY_LIST;
+ connections = Collections.EMPTY_LIST;
+ versionedProcessGroups = Collections.EMPTY_SET;
+ } else {
+ processorNodes =
group.getProcessors().stream().collect(Collectors.toList());
+ controllerServiceNodes =
group.getControllerServices(false).stream().collect(Collectors.toList());
+ inputPorts =
group.getInputPorts().stream().collect(Collectors.toList());
+ connections =
group.getConnections().stream().collect(Collectors.toList());
+ processGroups =
group.getProcessGroups().stream().collect(Collectors.toList());
Review comment:
Wouldn't it be easier to understand to use `new
ArrayList(group.getProcessors()/getControllerServices()/etc)`?
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
##########
@@ -408,59 +475,82 @@ public void
testChangeVersionFromExplicitValueToParameterSensitiveProperty() {
return differences;
}
- private VersionedFlowSnapshot createFlowSnapshot(final
List<ControllerServiceNode> controllerServices, final List<ProcessorNode>
processors, final Set<Parameter> parameters) {
- final VersionedFlowSnapshotMetadata snapshotMetadata = new
VersionedFlowSnapshotMetadata();
- snapshotMetadata.setAuthor("unit-test");
- snapshotMetadata.setBucketIdentifier("unit-test-bucket");
- snapshotMetadata.setFlowIdentifier("unit-test-flow");
- snapshotMetadata.setTimestamp(System.currentTimeMillis());
- snapshotMetadata.setVersion(1);
+ private VersionedFlowSnapshot createFlowSnapshot(final ProcessGroup group,
final List<ControllerServiceNode> controllerServices,
+ final List<ProcessorNode>
processors, final Set<Parameter> parameters) {
+ final VersionedFlowSnapshotMetadata snapshotMetadata =
createSnapshotMetadata();
- final Bucket bucket = new Bucket();
- bucket.setCreatedTimestamp(System.currentTimeMillis());
- bucket.setIdentifier("unit-test-bucket");
- bucket.setName("Unit Test Bucket");
+ final Bucket bucket = createBucket();
- final VersionedFlow flow = new VersionedFlow();
- flow.setBucketIdentifier("unit-test-bucket");
- flow.setBucketName("Unit Test Bucket");
- flow.setCreatedTimestamp(System.currentTimeMillis());
- flow.setIdentifier("unit-test-flow");
- flow.setName("Unit Test Flow");
+ final VersionedFlow flow = createVersionedFlow();
- final BundleCoordinate coordinate =
getSystemBundle().getBundleDetails().getCoordinate();
- final Bundle bundle = new Bundle();
- bundle.setArtifact(coordinate.getId());
- bundle.setGroup(coordinate.getGroup());
- bundle.setVersion(coordinate.getVersion());
+ createBundle();
final NiFiRegistryFlowMapper flowMapper = new
NiFiRegistryFlowMapper(getExtensionManager());
+ final List<ProcessorNode> processorNodes;
+ final List<ControllerServiceNode> controllerServiceNodes;
+ final List<Port> inputPorts;
+ final List<Connection> connections;
+ final List<ProcessGroup> processGroups;
+ final Set<VersionedProcessGroup> versionedProcessGroups;
+
+ if (group == null) {
+ processorNodes = processors;
+ controllerServiceNodes = controllerServices;
+ inputPorts = Collections.EMPTY_LIST;
+ connections = Collections.EMPTY_LIST;
+ versionedProcessGroups = Collections.EMPTY_SET;
+ } else {
+ processorNodes =
group.getProcessors().stream().collect(Collectors.toList());
+ controllerServiceNodes =
group.getControllerServices(false).stream().collect(Collectors.toList());
+ inputPorts =
group.getInputPorts().stream().collect(Collectors.toList());
+ connections =
group.getConnections().stream().collect(Collectors.toList());
+ processGroups =
group.getProcessGroups().stream().collect(Collectors.toList());
+
+ final VersionedProcessGroup versionedGroup =
flowMapper.mapProcessGroup(group,
getFlowController().getControllerServiceProvider(),getFlowController().getFlowRegistryClient(),true);
+ processGroups.forEach(processGroup->
+
versionedGroup.getProcessGroups().stream().filter(versionedProcessGroup ->
versionedProcessGroup.getName().equals(processGroup.getName()))
+ .forEach(filteredProcessGroup ->
processGroup.setVersionedComponentId(filteredProcessGroup.getIdentifier())));
+ versionedProcessGroups =
versionedGroup.getProcessGroups().stream().collect(Collectors.toSet());
Review comment:
Same as above but with `new HashSet()`
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
##########
@@ -384,7 +391,67 @@ public void
testChangeVersionFromExplicitValueToParameterSensitiveProperty() {
assertEquals("#{secret-param}",
nodeInGroupWithNoValue.getProperty(UsernamePasswordProcessor.PASSWORD).getRawValue());
}
+ @Test
+ public void testUpdateFlowWithInputPortMovedFromGroupAToGroupB() {
+ //Testing use case NIFI-9018
+ //Create Process Group A
+ final ProcessGroup groupA = createProcessGroup("group-a-id", "Group
A", getRootGroup());
+
+ //Add Input Port to Process Group A
+ final Port port =
getFlowController().getFlowManager().createLocalInputPort("input-port-id",
"Input Port");
+ groupA.addInputPort(port);
+
+ //Create a snapshot
+ final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA);
+
+ //Create Process Group B under Process Group A
+ final ProcessGroup groupB = createProcessGroup("group-b-id", "Group
B", groupA);
+
+ //Move Input Port from Process Group A to Process Group B
+ moveInputPort(port, groupB);
+
+ //Create Processor under Process Group A
+ final ProcessorNode processor =
createProcessorNode(GenerateProcessor.class, groupA);
+
+ //Create Connection between Processor in Process Group A and Input
Port in Process Group B
+ final Connection connection = connect(groupA, processor, port,
processor.getRelationships());
+ //Create another snapshot
+ final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA);
+
+ //Change Process Group A version to Version 1
+ groupA.updateFlow(version1, null, false, true, true);
+
+ //Process Group A should have only one Input Port and no Process
Groups, Processors or Connections
+ assertTrue(groupA.getProcessGroups().isEmpty());
+ assertTrue(groupA.getProcessors().isEmpty());
+ assertTrue(groupA.getConnections().isEmpty());
+ assertEquals(1L, groupA.getInputPorts().stream().filter(p ->
p.getVersionedComponentId().get().equals(port.getVersionedComponentId().get())).count());
Review comment:
Readability improvement suggestion:
```suggestion
assertEquals(1, groupA.getInputPorts().size());
assertEquals(port.getVersionedComponentId(),
groupA.getInputPortByName("Input Port").getVersionedComponentId());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]