This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 5a2d429045 NIFI-14758 Wait for new Coordinator when Decommissioning
current Coordinator (#10101)
5a2d429045 is described below
commit 5a2d42904578080faf94523319750903a36b07e5
Author: Bryan Bende <[email protected]>
AuthorDate: Mon Jul 21 21:38:05 2025 -0400
NIFI-14758 Wait for new Coordinator when Decommissioning current
Coordinator (#10101)
Signed-off-by: David Handermann <[email protected]>
---
.../cluster/coordination/ClusterCoordinator.java | 31 ++++++------
.../protocol/AbstractNodeProtocolSender.java | 19 ++++++++
.../nifi/cluster/protocol/NodeProtocolSender.java | 11 +++++
.../protocol/impl/NodeProtocolSenderListener.java | 7 +++
.../protocol/jaxb/message/ObjectFactory.java | 12 ++++-
...essage.java => NodeStatusesRequestMessage.java} | 30 +++---------
...ssage.java => NodeStatusesResponseMessage.java} | 39 +++++++--------
.../cluster/protocol/message/ProtocolMessage.java | 2 +
.../cluster/StandardClusterDetailsFactory.java | 10 +++-
.../heartbeat/ClusterProtocolHeartbeatMonitor.java | 27 +++++++----
.../coordination/node/NodeClusterCoordinator.java | 19 ++++++++
.../cluster/lifecycle/ClusterDecommissionTask.java | 55 ++++++++++++++++++----
.../heartbeat/TestAbstractHeartbeatMonitor.java | 13 +++--
13 files changed, 193 insertions(+), 82 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
index f6ad8454f7..13119aa278 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
@@ -127,6 +127,15 @@ public interface ClusterCoordinator {
*/
NodeConnectionStatus getConnectionStatus(NodeIdentifier nodeId);
+ /**
+ * Retrieves the current status of the node by fetching it from the
cluster coordinator.
+ *
+ * @param nodeId the identifier of the node
+ * @return the current status of the node with the given identifier,
+ * or <code>null</code> if no node is known with the given
identifier
+ */
+ NodeConnectionStatus fetchConnectionStatus(NodeIdentifier nodeId);
+
/**
* Returns the identifiers of all nodes that have the given connection
state
*
@@ -213,6 +222,13 @@ public interface ClusterCoordinator {
*/
NodeIdentifier getLocalNodeIdentifier();
+ /**
+ * Waits for a cluster coordinator to be elected and returns the
identifier of the node that is currently elected as the cluster coordinator.
+ *
+ * @return the identifier of the node that is currently elected as the
cluster coordinator
+ */
+ NodeIdentifier waitForElectedClusterCoordinator();
+
/**
* @return <code>true</code> if this node has been elected the active
cluster coordinator, <code>false</code> otherwise.
*/
@@ -292,19 +308,4 @@ public interface ClusterCoordinator {
default void validateHeartbeat(NodeHeartbeat nodeHeartbeat) {
}
- /**
- * Stops notifying the given listener when cluster topology events occurs
- * @param eventListener the event listener to stop notifying
- */
- void unregisterEventListener(ClusterTopologyEventListener eventListener);
-
- default String summarizeClusterState() {
- final StringBuilder sb = new StringBuilder();
- for (final NodeIdentifier nodeId : getNodeIdentifiers()) {
- sb.append(nodeId.getFullDescription()).append(" :
").append(getConnectionStatus(nodeId));
- sb.append("\n");
- }
-
- return sb.toString();
- }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
index 33322ee88a..f7f8a6601d 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
@@ -24,6 +24,8 @@ import
org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesRequestMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesResponseMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.io.socket.SocketConfiguration;
@@ -156,6 +158,23 @@ public abstract class AbstractNodeProtocolSender
implements NodeProtocolSender {
throw new ProtocolException("Expected message type '" +
MessageType.CLUSTER_WORKLOAD_RESPONSE + "' but found '" +
responseMessage.getType() + "'");
}
+ @Override
+ public NodeStatusesResponseMessage nodeStatuses(final
NodeStatusesRequestMessage msg) throws ProtocolException {
+ final InetSocketAddress serviceAddress;
+ try {
+ serviceAddress = getServiceAddress();
+ } catch (IOException e) {
+ throw new ProtocolException("Failed to get Service Address", e);
+ }
+
+ final ProtocolMessage responseMessage = sendProtocolMessage(msg,
serviceAddress.getHostName(), serviceAddress.getPort(), new
CommsTimingDetails());
+ if (MessageType.NODE_STATUSES_RESPONSE == responseMessage.getType()) {
+ return (NodeStatusesResponseMessage) responseMessage;
+ }
+
+ throw new ProtocolException("Expected message type '" +
MessageType.NODE_STATUSES_RESPONSE + "' but found '" +
responseMessage.getType() + "'");
+ }
+
private Socket createSocket(final InetSocketAddress socketAddress) {
try {
// create a socket
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
index 60a3bafa59..8f1726e7ff 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
@@ -22,6 +22,8 @@ import
org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesRequestMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesResponseMessage;
/**
* An interface for sending protocol messages from a node to the cluster
@@ -60,4 +62,13 @@ public interface NodeProtocolSender {
* @throws ProtocolException if communication failed
*/
ClusterWorkloadResponseMessage
clusterWorkload(ClusterWorkloadRequestMessage msg) throws ProtocolException;
+
+ /**
+ * Sends a "node statuses" request message to the Cluster Coordinator.
+ *
+ * @param msg a request message
+ * @return the response from the Cluster Coordinator containing the
statuses of all nodes in the cluster
+ * @throws ProtocolException if communication failed
+ */
+ NodeStatusesResponseMessage nodeStatuses(NodeStatusesRequestMessage msg)
throws ProtocolException;
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
index ac1a3479b9..c25b68c817 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
@@ -27,6 +27,8 @@ import
org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesRequestMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesResponseMessage;
import org.apache.nifi.reporting.BulletinRepository;
import java.io.IOException;
@@ -103,4 +105,9 @@ public class NodeProtocolSenderListener implements
NodeProtocolSender, ProtocolL
public ClusterWorkloadResponseMessage
clusterWorkload(ClusterWorkloadRequestMessage msg) throws ProtocolException {
return sender.clusterWorkload(msg);
}
+
+ @Override
+ public NodeStatusesResponseMessage nodeStatuses(NodeStatusesRequestMessage
msg) throws ProtocolException {
+ return sender.nodeStatuses(msg);
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
index 2518064863..4ed8f6adde 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
@@ -20,6 +20,8 @@ import jakarta.xml.bind.annotation.XmlRegistry;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesRequestMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesResponseMessage;
import org.apache.nifi.cluster.protocol.message.OffloadMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
@@ -101,10 +103,18 @@ public class ObjectFactory {
return new NodeConnectionStatusRequestMessage();
}
- public NodeConnectionStatusResponseMessage
createNodeConnectionStatusResponsetMessage() {
+ public NodeConnectionStatusResponseMessage
createNodeConnectionStatusResponseMessage() {
return new NodeConnectionStatusResponseMessage();
}
+ public NodeStatusesRequestMessage createNodeStatusesRequestMessage() {
+ return new NodeStatusesRequestMessage();
+ }
+
+ public NodeStatusesResponseMessage createNodeStatusesResponseMessage() {
+ return new NodeStatusesResponseMessage();
+ }
+
public HeartbeatResponseMessage createHeartbeatResponse() {
return new HeartbeatResponseMessage();
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusesRequestMessage.java
similarity index 58%
copy from
nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
copy to
nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusesRequestMessage.java
index fe26c7a2cc..ef54ea8288 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusesRequestMessage.java
@@ -16,29 +16,13 @@
*/
package org.apache.nifi.cluster.protocol.message;
-public abstract class ProtocolMessage {
+import jakarta.xml.bind.annotation.XmlRootElement;
- public static enum MessageType {
- CONNECTION_REQUEST,
- CONNECTION_RESPONSE,
- OFFLOAD_REQUEST,
- DISCONNECTION_REQUEST,
- EXCEPTION,
- FLOW_REQUEST,
- FLOW_RESPONSE,
- PING,
- RECONNECTION_REQUEST,
- RECONNECTION_RESPONSE,
- SERVICE_BROADCAST,
- HEARTBEAT,
- HEARTBEAT_RESPONSE,
- NODE_CONNECTION_STATUS_REQUEST,
- NODE_CONNECTION_STATUS_RESPONSE,
- NODE_STATUS_CHANGE,
- CLUSTER_WORKLOAD_REQUEST,
- CLUSTER_WORKLOAD_RESPONSE
- }
-
- public abstract MessageType getType();
+@XmlRootElement(name = "nodeStatusesRequest")
+public class NodeStatusesRequestMessage extends ProtocolMessage {
+ @Override
+ public MessageType getType() {
+ return MessageType.NODE_STATUSES_REQUEST;
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusesResponseMessage.java
similarity index 57%
copy from
nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
copy to
nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusesResponseMessage.java
index fe26c7a2cc..2e431eac04 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusesResponseMessage.java
@@ -16,29 +16,26 @@
*/
package org.apache.nifi.cluster.protocol.message;
-public abstract class ProtocolMessage {
+import jakarta.xml.bind.annotation.XmlRootElement;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
- public static enum MessageType {
- CONNECTION_REQUEST,
- CONNECTION_RESPONSE,
- OFFLOAD_REQUEST,
- DISCONNECTION_REQUEST,
- EXCEPTION,
- FLOW_REQUEST,
- FLOW_RESPONSE,
- PING,
- RECONNECTION_REQUEST,
- RECONNECTION_RESPONSE,
- SERVICE_BROADCAST,
- HEARTBEAT,
- HEARTBEAT_RESPONSE,
- NODE_CONNECTION_STATUS_REQUEST,
- NODE_CONNECTION_STATUS_RESPONSE,
- NODE_STATUS_CHANGE,
- CLUSTER_WORKLOAD_REQUEST,
- CLUSTER_WORKLOAD_RESPONSE
+import java.util.List;
+
+@XmlRootElement(name = "nodeStatusesResponse")
+public class NodeStatusesResponseMessage extends ProtocolMessage {
+
+ private List<NodeConnectionStatus> nodeStatuses;
+
+ @Override
+ public MessageType getType() {
+ return MessageType.NODE_STATUSES_RESPONSE;
}
- public abstract MessageType getType();
+ public List<NodeConnectionStatus> getNodeStatuses() {
+ return nodeStatuses;
+ }
+ public void setNodeStatuses(final List<NodeConnectionStatus> nodeStatuses)
{
+ this.nodeStatuses = nodeStatuses;
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
index fe26c7a2cc..091495b921 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
@@ -35,6 +35,8 @@ public abstract class ProtocolMessage {
NODE_CONNECTION_STATUS_REQUEST,
NODE_CONNECTION_STATUS_RESPONSE,
NODE_STATUS_CHANGE,
+ NODE_STATUSES_REQUEST,
+ NODE_STATUSES_RESPONSE,
CLUSTER_WORKLOAD_REQUEST,
CLUSTER_WORKLOAD_RESPONSE
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/StandardClusterDetailsFactory.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/StandardClusterDetailsFactory.java
index e53fd6bec8..e61add877c 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/StandardClusterDetailsFactory.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/StandardClusterDetailsFactory.java
@@ -49,7 +49,15 @@ public class StandardClusterDetailsFactory implements
ClusterDetailsFactory {
return ConnectionState.UNKNOWN;
}
- final NodeConnectionStatus connectionStatus =
clusterCoordinator.getConnectionStatus(nodeIdentifier);
+ final NodeConnectionStatus connectionStatus;
+ if (clusterCoordinator.isActiveClusterCoordinator()) {
+ logger.debug("Getting Connection Status for Node Identifier {}
from local state", nodeIdentifier.getId());
+ connectionStatus =
clusterCoordinator.getConnectionStatus(nodeIdentifier);
+ } else {
+ logger.debug("Fetching Connection Status for Node Identifier {}
from Cluster Coordinator", nodeIdentifier.getId());
+ connectionStatus =
clusterCoordinator.fetchConnectionStatus(nodeIdentifier);
+ }
+
if (connectionStatus == null) {
logger.info("Cluster connection status is not currently known for
Node Identifier {}; returning Connection State of UNKNOWN",
nodeIdentifier.getId());
return ConnectionState.UNKNOWN;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
index e22116ca2b..ea40f02d5e 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@ -30,6 +30,8 @@ import
org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesRequestMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesResponseMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.util.NiFiProperties;
@@ -136,14 +138,12 @@ public class ClusterProtocolHeartbeatMonitor extends
AbstractHeartbeatMonitor im
@Override
public ProtocolMessage handle(final ProtocolMessage msg, Set<String>
nodeIds) throws ProtocolException {
- switch (msg.getType()) {
- case HEARTBEAT:
- return handleHeartbeat((HeartbeatMessage) msg);
- case CLUSTER_WORKLOAD_REQUEST:
- return handleClusterWorkload((ClusterWorkloadRequestMessage)
msg);
- default:
- throw new ProtocolException("Cannot handle message of type " +
msg.getType());
- }
+ return switch (msg.getType()) {
+ case HEARTBEAT -> handleHeartbeat((HeartbeatMessage) msg);
+ case CLUSTER_WORKLOAD_REQUEST ->
handleClusterWorkload((ClusterWorkloadRequestMessage) msg);
+ case NODE_STATUSES_REQUEST ->
handleNodeStatuses((NodeStatusesRequestMessage) msg);
+ default -> throw new ProtocolException("Cannot handle message of
type " + msg.getType());
+ };
}
private ProtocolMessage handleHeartbeat(final HeartbeatMessage msg) {
@@ -203,6 +203,15 @@ public class ClusterProtocolHeartbeatMonitor extends
AbstractHeartbeatMonitor im
return response;
}
+ private ProtocolMessage handleNodeStatuses(final
NodeStatusesRequestMessage msg) {
+ logger.debug("Received Node Statuses Request Message");
+ final NodeStatusesResponseMessage response = new
NodeStatusesResponseMessage();
+ final List<NodeConnectionStatus> nodeStatuses =
clusterCoordinator.getConnectionStatuses();
+ response.setNodeStatuses(nodeStatuses == null ?
Collections.emptyList() : nodeStatuses);
+ logger.debug("Responding with Node Statuses: {}",
response.getNodeStatuses());
+ return response;
+ }
+
private List<NodeConnectionStatus> getUpdatedStatuses(final
List<NodeConnectionStatus> nodeStatusList) {
// Map node's statuses by NodeIdentifier for quick & easy lookup
final Map<NodeIdentifier, NodeConnectionStatus> nodeStatusMap =
nodeStatusList.stream()
@@ -235,6 +244,6 @@ public class ClusterProtocolHeartbeatMonitor extends
AbstractHeartbeatMonitor im
@Override
public boolean canHandle(ProtocolMessage msg) {
- return msg.getType() == MessageType.HEARTBEAT || msg.getType() ==
MessageType.CLUSTER_WORKLOAD_REQUEST;
+ return msg.getType() == MessageType.HEARTBEAT || msg.getType() ==
MessageType.CLUSTER_WORKLOAD_REQUEST || msg.getType() ==
MessageType.NODE_STATUSES_REQUEST;
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 50beac29ba..a7bd5ed335 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -54,6 +54,8 @@ import
org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import
org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesRequestMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesResponseMessage;
import org.apache.nifi.cluster.protocol.message.OffloadMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
@@ -274,6 +276,7 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
return nodeId;
}
+ @Override
public NodeIdentifier waitForElectedClusterCoordinator() {
return waitForNodeIdentifier(() ->
getElectedActiveCoordinatorNode(false));
}
@@ -634,6 +637,22 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
return nodeStatuses.get(nodeId.getId());
}
+ @Override
+ public NodeConnectionStatus fetchConnectionStatus(final NodeIdentifier
nodeId) {
+ final NodeStatusesResponseMessage responseMessage =
nodeProtocolSender.nodeStatuses(new NodeStatusesRequestMessage());
+ if (responseMessage == null || responseMessage.getNodeStatuses() ==
null) {
+ logger.warn("Failed to fetch connection status for {}", nodeId);
+ return null;
+ }
+
+ logger.debug("Received Node Statuses Response [{}]",
responseMessage.getNodeStatuses());
+
+ return responseMessage.getNodeStatuses().stream()
+ .filter(s -> s.getNodeIdentifier().equals(nodeId))
+ .findFirst()
+ .orElse(null);
+ }
+
private NodeConnectionState getConnectionState(final NodeIdentifier
nodeId) {
final NodeConnectionStatus status = getConnectionStatus(nodeId);
return status == null ? null : status.getState();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/lifecycle/ClusterDecommissionTask.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/lifecycle/ClusterDecommissionTask.java
index 46a1f269cf..eb3183db1a 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/lifecycle/ClusterDecommissionTask.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/lifecycle/ClusterDecommissionTask.java
@@ -39,7 +39,7 @@ import java.util.concurrent.TimeUnit;
public class ClusterDecommissionTask implements DecommissionTask {
private static final Logger logger =
LoggerFactory.getLogger(ClusterDecommissionTask.class);
- private static final int delaySeconds = 3;
+ private static final int DELAY_SECONDS = 3;
private final ClusterCoordinator clusterCoordinator;
private final FlowController flowController;
@@ -62,6 +62,12 @@ public class ClusterDecommissionTask implements
DecommissionTask {
throw new IllegalStateException("Node has not yet connected to the
cluster");
}
+ final boolean wasCoordinator =
clusterCoordinator.isActiveClusterCoordinator();
+ final NodeIdentifier coordinatorId =
clusterCoordinator.getElectedActiveCoordinatorNode();
+ if (wasCoordinator) {
+ logger.info("Decommissioning Cluster Coordinator [{}]...",
coordinatorId);
+ }
+
flowController.stopHeartbeating();
flowController.setClustered(false, null);
logger.info("Instructed FlowController to stop sending heartbeats to
Cluster Coordinator and take Cluster Disconnect actions");
@@ -72,6 +78,10 @@ public class ClusterDecommissionTask implements
DecommissionTask {
waitForDisconnection();
logger.info("Successfully disconnected node from cluster");
+ if (wasCoordinator) {
+ waitForNotActiveClusterCoordinator(coordinatorId);
+ }
+
offloadNode();
logger.info("Successfully triggered Node Offload. Will wait for
offload to complete");
@@ -125,7 +135,13 @@ public class ClusterDecommissionTask implements
DecommissionTask {
private void waitForState(final Set<NodeConnectionState> acceptableStates)
throws InterruptedException {
while (true) {
- final NodeConnectionStatus status =
clusterCoordinator.getConnectionStatus(localNodeIdentifier);
+ final NodeConnectionStatus status =
clusterCoordinator.fetchConnectionStatus(localNodeIdentifier);
+ if (status == null) {
+ logger.debug("Node connection status is null. Will wait {}
seconds and check again", DELAY_SECONDS);
+ TimeUnit.SECONDS.sleep(DELAY_SECONDS);
+ continue;
+ }
+
final NodeConnectionState state = status.getState();
logger.debug("Node state is {}", state);
@@ -133,7 +149,7 @@ public class ClusterDecommissionTask implements
DecommissionTask {
return;
}
- TimeUnit.SECONDS.sleep(delaySeconds);
+ TimeUnit.SECONDS.sleep(DELAY_SECONDS);
}
}
@@ -142,7 +158,7 @@ public class ClusterDecommissionTask implements
DecommissionTask {
int iterations = 0;
while (true) {
- final NodeConnectionStatus status =
clusterCoordinator.getConnectionStatus(localNodeIdentifier);
+ final NodeConnectionStatus status =
clusterCoordinator.fetchConnectionStatus(localNodeIdentifier);
final NodeConnectionState state = status.getState();
if (state == NodeConnectionState.OFFLOADED) {
return;
@@ -159,10 +175,10 @@ public class ClusterDecommissionTask implements
DecommissionTask {
final long byteCount = statusCounts.getQueuedContentSize();
logger.info("Node state is OFFLOADING. Currently, there are {}
FlowFiles ({} bytes) left on node.", flowFileCount, byteCount);
} else {
- logger.debug("Node state is OFFLOADING. Will wait {} seconds
and check again", delaySeconds);
+ logger.debug("Node state is OFFLOADING. Will wait {} seconds
and check again", DELAY_SECONDS);
}
- TimeUnit.SECONDS.sleep(delaySeconds);
+ TimeUnit.SECONDS.sleep(DELAY_SECONDS);
}
}
@@ -174,7 +190,7 @@ public class ClusterDecommissionTask implements
DecommissionTask {
logger.info("Waiting for Node to be completely removed from cluster");
while (true) {
- final NodeConnectionStatus status =
clusterCoordinator.getConnectionStatus(localNodeIdentifier);
+ final NodeConnectionStatus status =
clusterCoordinator.fetchConnectionStatus(localNodeIdentifier);
if (status == null) {
return;
}
@@ -184,8 +200,29 @@ public class ClusterDecommissionTask implements
DecommissionTask {
return;
}
- logger.debug("Node state is {}. Will wait {} seconds and check
again", state, delaySeconds);
- TimeUnit.SECONDS.sleep(delaySeconds);
+ logger.debug("Node state is {}. Will wait {} seconds and check
again", state, DELAY_SECONDS);
+ TimeUnit.SECONDS.sleep(DELAY_SECONDS);
}
}
+
+ private void waitForNotActiveClusterCoordinator(final NodeIdentifier
nodeIdentifier) throws InterruptedException {
+ logger.info("Waiting to no longer be the Active Cluster Coordinator");
+
+ // We check getElectedActiveCoordinatorNode here instead of calling
isActiveClusterCoordinator because in the case of Kubernetes leader election,
+ // isActiveClusterCoordinator will return false since the current node
unregistered from election for the coordinator role, but it still may hold
+ // the lease in Kubernetes, so we need to check the elected
coordinator node id directly
+ while (true) {
+ final NodeIdentifier coordinatorId =
clusterCoordinator.getElectedActiveCoordinatorNode();
+ logger.debug("Elected Cluster Coordinator is {}", coordinatorId);
+
+ if (coordinatorId != null &&
!coordinatorId.equals(nodeIdentifier)) {
+ logger.info("New Cluster Coordinator elected: {}",
clusterCoordinator.getElectedActiveCoordinatorNode());
+ return;
+ }
+
+ logger.debug("Still considered the Active Cluster Coordinator.
Will wait {} seconds and check again", DELAY_SECONDS);
+ TimeUnit.SECONDS.sleep(DELAY_SECONDS);
+ }
+ }
+
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 45cbf391ae..a41051431e 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -274,6 +274,11 @@ public class TestAbstractHeartbeatMonitor {
return statuses.get(nodeId);
}
+ @Override
+ public NodeConnectionStatus fetchConnectionStatus(NodeIdentifier
nodeId) {
+ return getConnectionStatus(nodeId);
+ }
+
@Override
public synchronized Set<NodeIdentifier>
getNodeIdentifiers(NodeConnectionState... states) {
final Set<NodeConnectionState> stateSet = new HashSet<>();
@@ -366,6 +371,11 @@ public class TestAbstractHeartbeatMonitor {
return null;
}
+ @Override
+ public NodeIdentifier waitForElectedClusterCoordinator() {
+ return null;
+ }
+
@Override
public List<NodeConnectionStatus> getConnectionStatuses() {
return Collections.emptyList();
@@ -395,9 +405,6 @@ public class TestAbstractHeartbeatMonitor {
public void registerEventListener(final ClusterTopologyEventListener
eventListener) {
}
- @Override
- public void unregisterEventListener(final ClusterTopologyEventListener
eventListener) {
- }
}