This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new f432d98166 NIFI-11159 Fixing connections with source having reassigned
id
f432d98166 is described below
commit f432d98166e3b095b388620e250b08f79e11175e
Author: Bence Simon <[email protected]>
AuthorDate: Thu Feb 9 19:32:14 2023 +0100
NIFI-11159 Fixing connections with source having reassigned id
---
.../StandardVersionedComponentSynchronizer.java | 23 ++++++++++++++++++++++
1 file changed, 23 insertions(+)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index cf9d6c5052..519089f048 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -614,16 +614,39 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
private void removeMissingConnections(final ProcessGroup group, final
VersionedProcessGroup proposed, final Map<String, Connection>
connectionsByVersionedId) {
final Set<String> connectionsRemoved = new
HashSet<>(connectionsByVersionedId.keySet());
+ final Set<String> connectionsRemovedDueToChangingSourceId = new
HashSet<>();
for (final VersionedConnection proposedConnection :
proposed.getConnections()) {
connectionsRemoved.remove(proposedConnection.getIdentifier());
}
+ // Check for any case where there's an existing connection whose ID
matches the proposed connection, but whose source doesn't match
+ // the proposed source ID. The source of a Connection should never
change from one component to another. However, there are cases
+ // in which the Versioned Component ID might change, in order to avoid
conflicts with sibling Process Groups. In such a case, we must remove
+ // the connection and create a new one, since we cannot simply change
the source in the same way that we can change the destination.
+ for (final VersionedConnection proposedConnection :
proposed.getConnections()) {
+ final Connection existingConnection =
connectionsByVersionedId.get(proposedConnection.getIdentifier());
+
+ if (existingConnection != null) {
+ final String proposedSourceId =
proposedConnection.getSource().getId();
+ final String existingSourceId =
existingConnection.getSource().getVersionedComponentId().orElse(null);
+
+ if (!Objects.equals(proposedSourceId, existingSourceId)) {
+
connectionsRemovedDueToChangingSourceId.add(proposedConnection.getIdentifier());
+ connectionsRemoved.add(proposedConnection.getIdentifier());
+ }
+ }
+ }
+
for (final String removedVersionedId : connectionsRemoved) {
final Connection connection =
connectionsByVersionedId.get(removedVersionedId);
LOG.info("Removing {} from {}", connection, group);
group.removeConnection(connection);
}
+
+ for (final String removedVersionedId :
connectionsRemovedDueToChangingSourceId) {
+ connectionsByVersionedId.remove(removedVersionedId);
+ }
}
private void synchronizeConnections(final ProcessGroup group, final
VersionedProcessGroup proposed, final Map<String, Connection>
connectionsByVersionedId) {