This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 21922af  NIFI-9794: If a node is OFFLOADING, do not allow connections 
to be deleted. Also if we fail to notify the node that it needs to offload its 
data, change its state back to DISCONNECTED. (#5865)
21922af is described below

commit 21922af90cf5b9b31995d1489d1c80d803dd6842
Author: markap14 <[email protected]>
AuthorDate: Mon Mar 14 15:45:02 2022 -0400

    NIFI-9794: If a node is OFFLOADING, do not allow connections to be deleted. 
Also if we fail to notify the node that it needs to offload its data, change 
its state back to DISCONNECTED. (#5865)
---
 .../replication/ThreadPoolRequestReplicator.java   | 29 ++++++++++++++++++----
 .../coordination/node/NodeClusterCoordinator.java  |  5 ++++
 2 files changed, 29 insertions(+), 5 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index 343bc56..fe39c8a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -24,12 +24,14 @@ import org.apache.nifi.authorization.user.NiFiUserUtils;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
 import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ConnectionEndpointMerger;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.manager.NodeResponse;
 import 
org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
 import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
 import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
+import 
org.apache.nifi.cluster.manager.exception.OffloadedNodeMutableRequestException;
 import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.exception.UriConstructionException;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
@@ -164,9 +166,10 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
     @Override
     public AsyncClusterResponse replicate(NiFiUser user, String method, URI 
uri, Object entity, Map<String, String> headers) {
         final Map<NodeConnectionState, List<NodeIdentifier>> stateMap = 
clusterCoordinator.getConnectionStates();
-        final boolean mutable = isMutableRequest(method, uri.getPath());
+        final boolean mutable = isMutableRequest(method);
 
-        // If the request is mutable, ensure that all nodes are connected.
+        // If the request is mutable, ensure the appropriate state: there can 
be no Connecting Nodes (in order to avoid confusion where a node gets the 
dataflow, and then gets modified before the
+        // node fully loads the dataflow), and we cannot delete a connection 
while a node is OFFLOADING (otherwise, we could delete a connection while a 
node is trying to push data to it).
         if (mutable) {
             final List<NodeIdentifier> connecting = 
stateMap.get(NodeConnectionState.CONNECTING);
             if (connecting != null && !connecting.isEmpty()) {
@@ -176,6 +179,13 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
                     throw new 
ConnectingNodeMutableRequestException(connecting.size() + " Nodes are currently 
connecting");
                 }
             }
+
+            if (isDeleteConnection(method, uri.getPath())) {
+                final List<NodeIdentifier> offloading = 
stateMap.get(NodeConnectionState.OFFLOADING);
+                if (offloading != null && !offloading.isEmpty()) {
+                    throw new OffloadedNodeMutableRequestException("Cannot 
delete conection because the following Nodes are currently being offloaded: " + 
offloading);
+                }
+            }
         }
 
         final List<NodeIdentifier> nodeIds = 
stateMap.get(NodeConnectionState.CONNECTED);
@@ -243,7 +253,7 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
             // performing an action, rather than simply proxying the request 
to the cluster coordinator. In this case,
             // we need to ensure that we use proper locking. We don't want two 
requests modifying the flow at the same
             // time, so we use a write lock if the request is mutable and a 
read lock otherwise.
-            final Lock lock = isMutableRequest(method, uri.getPath()) ? 
writeLock : readLock;
+            final Lock lock = isMutableRequest(method) ? writeLock : readLock;
             logger.debug("Obtaining lock {} in order to replicate request {} 
{}", lock, method, uri);
             lock.lock();
             try {
@@ -394,7 +404,7 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
             // issue the request. This is all handled by calling 
performVerification, which will replicate
             // the 'vote' request to all nodes and then if successful will 
call back into this method to
             // replicate the actual request.
-            final boolean mutableRequest = isMutableRequest(method, 
uri.getPath());
+            final boolean mutableRequest = isMutableRequest(method);
             if (mutableRequest && performVerification) {
                 logger.debug("Performing verification (first phase of 
two-phase commit) for Request ID {}", requestId);
                 performVerification(nodeIds, method, uri, entity, 
updatedHeaders, response, merge, monitor);
@@ -617,7 +627,7 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
         return nodeResponse;
     }
 
-    private boolean isMutableRequest(final String method, final String 
uriPath) {
+    private boolean isMutableRequest(final String method) {
         switch (method.toUpperCase()) {
             case HttpMethod.GET:
             case HttpMethod.HEAD:
@@ -628,6 +638,15 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
         }
     }
 
+    private boolean isDeleteConnection(final String method, final String 
uriPath) {
+        if (!HttpMethod.DELETE.equalsIgnoreCase(method)) {
+            return false;
+        }
+
+        final boolean isConnectionUri = 
ConnectionEndpointMerger.CONNECTION_URI_PATTERN.matcher(uriPath).matches();
+        return isConnectionUri;
+    }
+
     /**
      * Verifies that the cluster is in a state that will allow requests to be 
made using the given HTTP Method and URI path
      *
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 5326073..c0034ce 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -950,6 +950,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
                     try {
                         senderListener.offload(request);
                         reportEvent(nodeId, Severity.INFO, "Node was offloaded 
due to " + request.getExplanation());
+
                         future.complete(null);
                         return;
                     } catch (final Exception e) {
@@ -965,6 +966,10 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
                     }
                 }
 
+                updateNodeStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.DISCONNECTED, null,
+                    "Attempted to offload node but failed to notify node that 
it was to offload its data. State reset to disconnected."));
+                addNodeEvent(nodeId, "Failed to initiate node offload: " + 
lastException);
+
                 future.completeExceptionally(lastException);
             }
         }, "Offload " + request.getNodeId());

Reply via email to