This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2a3f7da NIFI-9442: When deleting a connection, ensure that when the
source of the connection is a funnel that its upstream components are checked
(recursively)
2a3f7da is described below
commit 2a3f7dafd657a8f1a2d638688889710df3ef8acd
Author: Mark Payne <[email protected]>
AuthorDate: Fri Dec 3 14:14:03 2021 -0500
NIFI-9442: When deleting a connection, ensure that when the source of the
connection is a funnel that its upstream components are checked (recursively)
Signed-off-by: Joe Gresock <[email protected]>
This closes #5568.
---
.../nifi/connectable/StandardConnection.java | 33 ++++++++++++++++++----
1 file changed, 28 insertions(+), 5 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index 1cf78b6..a866c32 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -497,11 +497,16 @@ public final class StandardConnection implements
Connection, ConnectionEventList
throw new IllegalStateException("Queue not empty for " +
this.getIdentifier());
}
- if (source.isRunning()) {
- if (!ConnectableType.FUNNEL.equals(source.getConnectableType())) {
- throw new IllegalStateException("Source of Connection (" +
source.getIdentifier() + ") is running");
- }
- }
+ // The source must be stopped unless it is a Funnel. Funnels cannot be
stopped & started. But if the source is a Funnel,
+ // it means that its sources must also be stopped, and the check must
go on recursively.
+ // This is important for a case in which we have a cluster where two
processors (for example) are connected with a funnel in between.
+ // In this case, if a user deletes the connection between the funnel
and its destination, the web request that is made will be done in two
+ // phases: (1) Verify that the request is valid and (2) Delete the
connection. But if we don't recursively ensure that the upstream components
+ // are stopped, we could have all nodes in the cluster verify the
request is valid in the first phase. But before the second phase occurs, one
+ // node may now have data within the Connection, so the second phase
(the delete) will fail. In that situation, the node's dataflow will differ
+ // from the rest of the cluster, and the node will be kicked out of
the cluster. To avoid this, we simply ensure that the source is stopped,
+ // and if the source is a funnel (which can't be stopped) that its
sources are stopped.
+ verifySourceStoppedOrFunnel(this);
final Connectable dest = destination.get();
if (dest.isRunning()) {
@@ -511,6 +516,24 @@ public final class StandardConnection implements
Connection, ConnectionEventList
}
}
+ private void verifySourceStoppedOrFunnel(final Connection connection) {
+ final Connectable sourceComponent = connection.getSource();
+ if (!sourceComponent.isRunning()) {
+ return;
+ }
+
+ final ConnectableType connectableType =
sourceComponent.getConnectableType();
+ if (connectableType != ConnectableType.FUNNEL) {
+ // Source is running and not a funnel. Source is not considered
stopped.
+ throw new IllegalStateException("Upstream component of Connection
(" + sourceComponent + ") is running");
+ }
+
+ // Source is a funnel and is running. We need to then check all of its
upstream components.
+ for (final Connection incoming : source.getIncomingConnections()) {
+ verifySourceStoppedOrFunnel(incoming);
+ }
+ }
+
@Override
public Optional<String> getVersionedComponentId() {
return Optional.ofNullable(versionedComponentId.get());