This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new d4a2afc NIFI-7241: When updating Process Group to match
VersionedProcessGroup, remove any connections before recursing into child
groups. This ensures that if a Port exists in child group A and is connected to
a port in child group B, if the VersionedProcessGroup indicates to remove the
port, that connection will be removed before attempting to remove the port.
Updating and adding connections must still be done last, after all components
have been added. But missing connections [...]
d4a2afc is described below
commit d4a2afc25ce35570dff73e6cb02a6820dd7062e1
Author: Mark Payne <[email protected]>
AuthorDate: Wed Mar 11 16:48:49 2020 -0400
NIFI-7241: When updating Process Group to match VersionedProcessGroup,
remove any connections before recursing into child groups. This ensures that if
a Port exists in child group A and is connected to a port in child group B, if
the VersionedProcessGroup indicates to remove the port, that connection will be
removed before attempting to remove the port. Updating and adding connections
must still be done last, after all components have been added. But missing
connections can be removed [...]
Signed-off-by: Pierre Villard <[email protected]>
This closes #4136.
---
.../apache/nifi/groups/StandardProcessGroup.java | 42 ++++++++++++----------
1 file changed, 24 insertions(+), 18 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index cf9045f..b725d6d 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -3805,7 +3805,7 @@ public final class StandardProcessGroup implements
ProcessGroup {
ControllerServiceNode service =
servicesByVersionedId.get(proposedService.getIdentifier());
if (service == null) {
service = addControllerService(group, proposedService,
componentIdSeed);
- LOG.info("Added {} to {}", service, this);
+ LOG.info("Added {} to {}", service, group);
servicesAdded.put(proposedService.getIdentifier(), service);
}
@@ -3837,6 +3837,28 @@ public final class StandardProcessGroup implements
ProcessGroup {
controllerServicesRemoved.remove(proposedService.getIdentifier());
}
+ // Before we can update child groups, we must first remove any
connections that are connected to those child groups' input/output ports.
+ // We cannot add or update connections yet, though. That must be done
at the end, as it's possible that the component that is the source/destination
of the connection
+ // has not yet been added.
+ final Map<String, Connection> connectionsByVersionedId =
group.getConnections().stream()
+ .collect(Collectors.toMap(component ->
component.getVersionedComponentId().orElse(
+
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())),
Function.identity()));
+ final Set<String> connectionsRemoved = new
HashSet<>(connectionsByVersionedId.keySet());
+
+ for (final VersionedConnection proposedConnection :
proposed.getConnections()) {
+ connectionsRemoved.remove(proposedConnection.getIdentifier());
+ }
+
+ // Connections must be the first thing to remove, not the last.
Otherwise, we will fail
+ // to remove a component if it has a connection going to it!
+ for (final String removedVersionedId : connectionsRemoved) {
+ final Connection connection =
connectionsByVersionedId.get(removedVersionedId);
+ LOG.info("Removing {} from {}", connection, group);
+ group.removeConnection(connection);
+ flowManager.onConnectionRemoved(connection);
+ }
+
+
// Child groups
final Map<String, ProcessGroup> childGroupsByVersionedId =
group.getProcessGroups().stream()
.collect(Collectors.toMap(component ->
component.getVersionedComponentId().orElse(
@@ -4028,12 +4050,7 @@ public final class StandardProcessGroup implements
ProcessGroup {
}
- // Connections
- final Map<String, Connection> connectionsByVersionedId =
group.getConnections().stream()
- .collect(Collectors.toMap(component ->
component.getVersionedComponentId().orElse(
-
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())),
Function.identity()));
- final Set<String> connectionsRemoved = new
HashSet<>(connectionsByVersionedId.keySet());
-
+ // Add and update Connections
for (final VersionedConnection proposedConnection :
proposed.getConnections()) {
final Connection connection =
connectionsByVersionedId.get(proposedConnection.getIdentifier());
if (connection == null) {
@@ -4047,21 +4064,10 @@ public final class StandardProcessGroup implements
ProcessGroup {
updateConnection(connection, proposedConnection);
LOG.info("Updated {}", connection);
}
-
- connectionsRemoved.remove(proposedConnection.getIdentifier());
}
// Remove components that exist in the local flow but not the remote
flow.
- // Connections must be the first thing to remove, not the last.
Otherwise, we will fail
- // to remove a component if it has a connection going to it!
- for (final String removedVersionedId : connectionsRemoved) {
- final Connection connection =
connectionsByVersionedId.get(removedVersionedId);
- LOG.info("Removing {} from {}", connection, group);
- group.removeConnection(connection);
- flowManager.onConnectionRemoved(connection);
- }
-
// Once the appropriate connections have been removed, we may now
update Processors' auto-terminated relationships.
// We cannot do this above, in the 'updateProcessor' call because if a
connection is removed and changed to auto-terminated,
// then updating this in the updateProcessor call above would attempt
to set the Relationship to being auto-terminated while a