This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 21922af NIFI-9794: If a node is OFFLOADING, do not allow connections
to be deleted. Also if we fail to notify the node that it needs to offload its
data, change its state back to DISCONNECTED. (#5865)
21922af is described below
commit 21922af90cf5b9b31995d1489d1c80d803dd6842
Author: markap14 <[email protected]>
AuthorDate: Mon Mar 14 15:45:02 2022 -0400
NIFI-9794: If a node is OFFLOADING, do not allow connections to be deleted.
Also if we fail to notify the node that it needs to offload its data, change
its state back to DISCONNECTED. (#5865)
---
.../replication/ThreadPoolRequestReplicator.java | 29 ++++++++++++++++++----
.../coordination/node/NodeClusterCoordinator.java | 5 ++++
2 files changed, 29 insertions(+), 5 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index 343bc56..fe39c8a 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -24,12 +24,14 @@ import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
+import
org.apache.nifi.cluster.coordination.http.endpoints.ConnectionEndpointMerger;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.manager.NodeResponse;
import
org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
+import
org.apache.nifi.cluster.manager.exception.OffloadedNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.exception.UriConstructionException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
@@ -164,9 +166,10 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator {
@Override
public AsyncClusterResponse replicate(NiFiUser user, String method, URI
uri, Object entity, Map<String, String> headers) {
final Map<NodeConnectionState, List<NodeIdentifier>> stateMap =
clusterCoordinator.getConnectionStates();
- final boolean mutable = isMutableRequest(method, uri.getPath());
+ final boolean mutable = isMutableRequest(method);
- // If the request is mutable, ensure that all nodes are connected.
+ // If the request is mutable, ensure the appropriate state: there can
be no Connecting Nodes (in order to avoid confusion where a node gets the
dataflow, and then gets modified before the
+ // node fully loads the dataflow), and we cannot delete a connection
while a node is OFFLOADING (otherwise, we could delete a connection while a
node is trying to push data to it).
if (mutable) {
final List<NodeIdentifier> connecting =
stateMap.get(NodeConnectionState.CONNECTING);
if (connecting != null && !connecting.isEmpty()) {
@@ -176,6 +179,13 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator {
throw new
ConnectingNodeMutableRequestException(connecting.size() + " Nodes are currently
connecting");
}
}
+
+ if (isDeleteConnection(method, uri.getPath())) {
+ final List<NodeIdentifier> offloading =
stateMap.get(NodeConnectionState.OFFLOADING);
+ if (offloading != null && !offloading.isEmpty()) {
+ throw new OffloadedNodeMutableRequestException("Cannot
delete conection because the following Nodes are currently being offloaded: " +
offloading);
+ }
+ }
}
final List<NodeIdentifier> nodeIds =
stateMap.get(NodeConnectionState.CONNECTED);
@@ -243,7 +253,7 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator {
// performing an action, rather than simply proxying the request
to the cluster coordinator. In this case,
// we need to ensure that we use proper locking. We don't want two
requests modifying the flow at the same
// time, so we use a write lock if the request is mutable and a
read lock otherwise.
- final Lock lock = isMutableRequest(method, uri.getPath()) ?
writeLock : readLock;
+ final Lock lock = isMutableRequest(method) ? writeLock : readLock;
logger.debug("Obtaining lock {} in order to replicate request {}
{}", lock, method, uri);
lock.lock();
try {
@@ -394,7 +404,7 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator {
// issue the request. This is all handled by calling
performVerification, which will replicate
// the 'vote' request to all nodes and then if successful will
call back into this method to
// replicate the actual request.
- final boolean mutableRequest = isMutableRequest(method,
uri.getPath());
+ final boolean mutableRequest = isMutableRequest(method);
if (mutableRequest && performVerification) {
logger.debug("Performing verification (first phase of
two-phase commit) for Request ID {}", requestId);
performVerification(nodeIds, method, uri, entity,
updatedHeaders, response, merge, monitor);
@@ -617,7 +627,7 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator {
return nodeResponse;
}
- private boolean isMutableRequest(final String method, final String
uriPath) {
+ private boolean isMutableRequest(final String method) {
switch (method.toUpperCase()) {
case HttpMethod.GET:
case HttpMethod.HEAD:
@@ -628,6 +638,15 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator {
}
}
+ private boolean isDeleteConnection(final String method, final String
uriPath) {
+ if (!HttpMethod.DELETE.equalsIgnoreCase(method)) {
+ return false;
+ }
+
+ final boolean isConnectionUri =
ConnectionEndpointMerger.CONNECTION_URI_PATTERN.matcher(uriPath).matches();
+ return isConnectionUri;
+ }
+
/**
* Verifies that the cluster is in a state that will allow requests to be
made using the given HTTP Method and URI path
*
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 5326073..c0034ce 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -950,6 +950,7 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
try {
senderListener.offload(request);
reportEvent(nodeId, Severity.INFO, "Node was offloaded
due to " + request.getExplanation());
+
future.complete(null);
return;
} catch (final Exception e) {
@@ -965,6 +966,10 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
}
}
+ updateNodeStatus(new NodeConnectionStatus(nodeId,
NodeConnectionState.DISCONNECTED, null,
+ "Attempted to offload node but failed to notify node that
it was to offload its data. State reset to disconnected."));
+ addNodeEvent(nodeId, "Failed to initiate node offload: " +
lastException);
+
future.completeExceptionally(lastException);
}
}, "Offload " + request.getNodeId());