This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a2d429045 NIFI-14758 Wait for new Coordinator when Decommissioning 
current Coordinator (#10101)
5a2d429045 is described below

commit 5a2d42904578080faf94523319750903a36b07e5
Author: Bryan Bende <[email protected]>
AuthorDate: Mon Jul 21 21:38:05 2025 -0400

    NIFI-14758 Wait for new Coordinator when Decommissioning current 
Coordinator (#10101)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../cluster/coordination/ClusterCoordinator.java   | 31 ++++++------
 .../protocol/AbstractNodeProtocolSender.java       | 19 ++++++++
 .../nifi/cluster/protocol/NodeProtocolSender.java  | 11 +++++
 .../protocol/impl/NodeProtocolSenderListener.java  |  7 +++
 .../protocol/jaxb/message/ObjectFactory.java       | 12 ++++-
 ...essage.java => NodeStatusesRequestMessage.java} | 30 +++---------
 ...ssage.java => NodeStatusesResponseMessage.java} | 39 +++++++--------
 .../cluster/protocol/message/ProtocolMessage.java  |  2 +
 .../cluster/StandardClusterDetailsFactory.java     | 10 +++-
 .../heartbeat/ClusterProtocolHeartbeatMonitor.java | 27 +++++++----
 .../coordination/node/NodeClusterCoordinator.java  | 19 ++++++++
 .../cluster/lifecycle/ClusterDecommissionTask.java | 55 ++++++++++++++++++----
 .../heartbeat/TestAbstractHeartbeatMonitor.java    | 13 +++--
 13 files changed, 193 insertions(+), 82 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
index f6ad8454f7..13119aa278 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
@@ -127,6 +127,15 @@ public interface ClusterCoordinator {
      */
     NodeConnectionStatus getConnectionStatus(NodeIdentifier nodeId);
 
+    /**
+     * Retrieves the current status of the node by fetching it from the 
cluster coordinator.
+     *
+     * @param nodeId the identifier of the node
+     * @return the current status of the node with the given identifier,
+     *         or <code>null</code> if no node is known with the given 
identifier
+     */
+    NodeConnectionStatus fetchConnectionStatus(NodeIdentifier nodeId);
+
     /**
      * Returns the identifiers of all nodes that have the given connection 
state
      *
@@ -213,6 +222,13 @@ public interface ClusterCoordinator {
      */
     NodeIdentifier getLocalNodeIdentifier();
 
+    /**
+     * Waits for a cluster coordinator to be elected and returns the 
identifier of the node that is currently elected as the cluster coordinator.
+     *
+     * @return the identifier of the node that is currently elected as the 
cluster coordinator
+     */
+    NodeIdentifier waitForElectedClusterCoordinator();
+
     /**
      * @return <code>true</code> if this node has been elected the active 
cluster coordinator, <code>false</code> otherwise.
      */
@@ -292,19 +308,4 @@ public interface ClusterCoordinator {
     default void validateHeartbeat(NodeHeartbeat nodeHeartbeat) {
     }
 
-    /**
-     * Stops notifying the given listener when cluster topology events occurs
-     * @param eventListener the event listener to stop notifying
-     */
-    void unregisterEventListener(ClusterTopologyEventListener eventListener);
-
-    default String summarizeClusterState() {
-        final StringBuilder sb = new StringBuilder();
-        for (final NodeIdentifier nodeId : getNodeIdentifiers()) {
-            sb.append(nodeId.getFullDescription()).append(" : 
").append(getConnectionStatus(nodeId));
-            sb.append("\n");
-        }
-
-        return sb.toString();
-    }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
index 33322ee88a..f7f8a6601d 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
@@ -24,6 +24,8 @@ import 
org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesRequestMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
 import org.apache.nifi.io.socket.SocketConfiguration;
@@ -156,6 +158,23 @@ public abstract class AbstractNodeProtocolSender 
implements NodeProtocolSender {
         throw new ProtocolException("Expected message type '" + 
MessageType.CLUSTER_WORKLOAD_RESPONSE + "' but found '" + 
responseMessage.getType() + "'");
     }
 
+    @Override
+    public NodeStatusesResponseMessage nodeStatuses(final 
NodeStatusesRequestMessage msg) throws ProtocolException {
+        final InetSocketAddress serviceAddress;
+        try {
+            serviceAddress = getServiceAddress();
+        } catch (IOException e) {
+            throw new ProtocolException("Failed to get Service Address", e);
+        }
+
+        final ProtocolMessage responseMessage = sendProtocolMessage(msg, 
serviceAddress.getHostName(), serviceAddress.getPort(), new 
CommsTimingDetails());
+        if (MessageType.NODE_STATUSES_RESPONSE == responseMessage.getType()) {
+            return (NodeStatusesResponseMessage) responseMessage;
+        }
+
+        throw new ProtocolException("Expected message type '" + 
MessageType.NODE_STATUSES_RESPONSE + "' but found '" + 
responseMessage.getType() + "'");
+    }
+
     private Socket createSocket(final InetSocketAddress socketAddress) {
         try {
             // create a socket
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
index 60a3bafa59..8f1726e7ff 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
@@ -22,6 +22,8 @@ import 
org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesRequestMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesResponseMessage;
 
 /**
  * An interface for sending protocol messages from a node to the cluster
@@ -60,4 +62,13 @@ public interface NodeProtocolSender {
      * @throws ProtocolException if communication failed
      */
     ClusterWorkloadResponseMessage 
clusterWorkload(ClusterWorkloadRequestMessage msg) throws ProtocolException;
+
+    /**
+     * Sends a "node statuses" request message to the Cluster Coordinator.
+     *
+     * @param msg a request message
+     * @return the response from the Cluster Coordinator containing the 
statuses of all nodes in the cluster
+     * @throws ProtocolException if communication failed
+     */
+    NodeStatusesResponseMessage nodeStatuses(NodeStatusesRequestMessage msg) 
throws ProtocolException;
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
index ac1a3479b9..c25b68c817 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
@@ -27,6 +27,8 @@ import 
org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesRequestMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesResponseMessage;
 import org.apache.nifi.reporting.BulletinRepository;
 
 import java.io.IOException;
@@ -103,4 +105,9 @@ public class NodeProtocolSenderListener implements 
NodeProtocolSender, ProtocolL
     public ClusterWorkloadResponseMessage 
clusterWorkload(ClusterWorkloadRequestMessage msg) throws ProtocolException {
         return sender.clusterWorkload(msg);
     }
+
+    @Override
+    public NodeStatusesResponseMessage nodeStatuses(NodeStatusesRequestMessage 
msg) throws ProtocolException {
+        return sender.nodeStatuses(msg);
+    }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
index 2518064863..4ed8f6adde 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
@@ -20,6 +20,8 @@ import jakarta.xml.bind.annotation.XmlRegistry;
 
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesRequestMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesResponseMessage;
 import org.apache.nifi.cluster.protocol.message.OffloadMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
@@ -101,10 +103,18 @@ public class ObjectFactory {
         return new NodeConnectionStatusRequestMessage();
     }
 
-    public NodeConnectionStatusResponseMessage 
createNodeConnectionStatusResponsetMessage() {
+    public NodeConnectionStatusResponseMessage 
createNodeConnectionStatusResponseMessage() {
         return new NodeConnectionStatusResponseMessage();
     }
 
+    public NodeStatusesRequestMessage createNodeStatusesRequestMessage() {
+        return new NodeStatusesRequestMessage();
+    }
+
+    public NodeStatusesResponseMessage createNodeStatusesResponseMessage() {
+        return new NodeStatusesResponseMessage();
+    }
+
     public HeartbeatResponseMessage createHeartbeatResponse() {
         return new HeartbeatResponseMessage();
     }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusesRequestMessage.java
similarity index 58%
copy from 
nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
copy to 
nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusesRequestMessage.java
index fe26c7a2cc..ef54ea8288 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusesRequestMessage.java
@@ -16,29 +16,13 @@
  */
 package org.apache.nifi.cluster.protocol.message;
 
-public abstract class ProtocolMessage {
+import jakarta.xml.bind.annotation.XmlRootElement;
 
-    public static enum MessageType {
-        CONNECTION_REQUEST,
-        CONNECTION_RESPONSE,
-        OFFLOAD_REQUEST,
-        DISCONNECTION_REQUEST,
-        EXCEPTION,
-        FLOW_REQUEST,
-        FLOW_RESPONSE,
-        PING,
-        RECONNECTION_REQUEST,
-        RECONNECTION_RESPONSE,
-        SERVICE_BROADCAST,
-        HEARTBEAT,
-        HEARTBEAT_RESPONSE,
-        NODE_CONNECTION_STATUS_REQUEST,
-        NODE_CONNECTION_STATUS_RESPONSE,
-        NODE_STATUS_CHANGE,
-        CLUSTER_WORKLOAD_REQUEST,
-        CLUSTER_WORKLOAD_RESPONSE
-    }
-
-    public abstract MessageType getType();
+@XmlRootElement(name = "nodeStatusesRequest")
+public class NodeStatusesRequestMessage extends ProtocolMessage {
 
+    @Override
+    public MessageType getType() {
+        return MessageType.NODE_STATUSES_REQUEST;
+    }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusesResponseMessage.java
similarity index 57%
copy from 
nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
copy to 
nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusesResponseMessage.java
index fe26c7a2cc..2e431eac04 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusesResponseMessage.java
@@ -16,29 +16,26 @@
  */
 package org.apache.nifi.cluster.protocol.message;
 
-public abstract class ProtocolMessage {
+import jakarta.xml.bind.annotation.XmlRootElement;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 
-    public static enum MessageType {
-        CONNECTION_REQUEST,
-        CONNECTION_RESPONSE,
-        OFFLOAD_REQUEST,
-        DISCONNECTION_REQUEST,
-        EXCEPTION,
-        FLOW_REQUEST,
-        FLOW_RESPONSE,
-        PING,
-        RECONNECTION_REQUEST,
-        RECONNECTION_RESPONSE,
-        SERVICE_BROADCAST,
-        HEARTBEAT,
-        HEARTBEAT_RESPONSE,
-        NODE_CONNECTION_STATUS_REQUEST,
-        NODE_CONNECTION_STATUS_RESPONSE,
-        NODE_STATUS_CHANGE,
-        CLUSTER_WORKLOAD_REQUEST,
-        CLUSTER_WORKLOAD_RESPONSE
+import java.util.List;
+
+@XmlRootElement(name = "nodeStatusesResponse")
+public class NodeStatusesResponseMessage extends ProtocolMessage {
+
+    private List<NodeConnectionStatus> nodeStatuses;
+
+    @Override
+    public MessageType getType() {
+        return MessageType.NODE_STATUSES_RESPONSE;
     }
 
-    public abstract MessageType getType();
+    public List<NodeConnectionStatus> getNodeStatuses() {
+        return nodeStatuses;
+    }
 
+    public void setNodeStatuses(final List<NodeConnectionStatus> nodeStatuses) 
{
+        this.nodeStatuses = nodeStatuses;
+    }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
index fe26c7a2cc..091495b921 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
@@ -35,6 +35,8 @@ public abstract class ProtocolMessage {
         NODE_CONNECTION_STATUS_REQUEST,
         NODE_CONNECTION_STATUS_RESPONSE,
         NODE_STATUS_CHANGE,
+        NODE_STATUSES_REQUEST,
+        NODE_STATUSES_RESPONSE,
         CLUSTER_WORKLOAD_REQUEST,
         CLUSTER_WORKLOAD_RESPONSE
     }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/StandardClusterDetailsFactory.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/StandardClusterDetailsFactory.java
index e53fd6bec8..e61add877c 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/StandardClusterDetailsFactory.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/StandardClusterDetailsFactory.java
@@ -49,7 +49,15 @@ public class StandardClusterDetailsFactory implements 
ClusterDetailsFactory {
             return ConnectionState.UNKNOWN;
         }
 
-        final NodeConnectionStatus connectionStatus = 
clusterCoordinator.getConnectionStatus(nodeIdentifier);
+        final NodeConnectionStatus connectionStatus;
+        if (clusterCoordinator.isActiveClusterCoordinator()) {
+            logger.debug("Getting Connection Status for Node Identifier {} 
from local state", nodeIdentifier.getId());
+            connectionStatus = 
clusterCoordinator.getConnectionStatus(nodeIdentifier);
+        } else {
+            logger.debug("Fetching Connection Status for Node Identifier {} 
from Cluster Coordinator", nodeIdentifier.getId());
+            connectionStatus = 
clusterCoordinator.fetchConnectionStatus(nodeIdentifier);
+        }
+
         if (connectionStatus == null) {
             logger.info("Cluster connection status is not currently known for 
Node Identifier {}; returning Connection State of UNKNOWN", 
nodeIdentifier.getId());
             return ConnectionState.UNKNOWN;
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
index e22116ca2b..ea40f02d5e 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@ -30,6 +30,8 @@ import 
org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesRequestMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
 import org.apache.nifi.util.NiFiProperties;
@@ -136,14 +138,12 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
 
     @Override
     public ProtocolMessage handle(final ProtocolMessage msg, Set<String> 
nodeIds) throws ProtocolException {
-        switch (msg.getType()) {
-            case HEARTBEAT:
-                return handleHeartbeat((HeartbeatMessage) msg);
-            case CLUSTER_WORKLOAD_REQUEST:
-                return handleClusterWorkload((ClusterWorkloadRequestMessage) 
msg);
-            default:
-                throw new ProtocolException("Cannot handle message of type " + 
msg.getType());
-        }
+        return switch (msg.getType()) {
+            case HEARTBEAT -> handleHeartbeat((HeartbeatMessage) msg);
+            case CLUSTER_WORKLOAD_REQUEST -> 
handleClusterWorkload((ClusterWorkloadRequestMessage) msg);
+            case NODE_STATUSES_REQUEST -> 
handleNodeStatuses((NodeStatusesRequestMessage) msg);
+            default -> throw new ProtocolException("Cannot handle message of 
type " + msg.getType());
+        };
     }
 
     private ProtocolMessage handleHeartbeat(final HeartbeatMessage msg) {
@@ -203,6 +203,15 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
         return response;
     }
 
+    private ProtocolMessage handleNodeStatuses(final 
NodeStatusesRequestMessage msg) {
+        logger.debug("Received Node Statuses Request Message");
+        final NodeStatusesResponseMessage response = new 
NodeStatusesResponseMessage();
+        final List<NodeConnectionStatus> nodeStatuses = 
clusterCoordinator.getConnectionStatuses();
+        response.setNodeStatuses(nodeStatuses == null ? 
Collections.emptyList() : nodeStatuses);
+        logger.debug("Responding with Node Statuses: {}", 
response.getNodeStatuses());
+        return response;
+    }
+
     private List<NodeConnectionStatus> getUpdatedStatuses(final 
List<NodeConnectionStatus> nodeStatusList) {
         // Map node's statuses by NodeIdentifier for quick & easy lookup
         final Map<NodeIdentifier, NodeConnectionStatus> nodeStatusMap = 
nodeStatusList.stream()
@@ -235,6 +244,6 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
 
     @Override
     public boolean canHandle(ProtocolMessage msg) {
-        return msg.getType() == MessageType.HEARTBEAT || msg.getType() == 
MessageType.CLUSTER_WORKLOAD_REQUEST;
+        return msg.getType() == MessageType.HEARTBEAT || msg.getType() == 
MessageType.CLUSTER_WORKLOAD_REQUEST || msg.getType() == 
MessageType.NODE_STATUSES_REQUEST;
     }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 50beac29ba..a7bd5ed335 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -54,6 +54,8 @@ import 
org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import 
org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
 import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesRequestMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusesResponseMessage;
 import org.apache.nifi.cluster.protocol.message.OffloadMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
@@ -274,6 +276,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         return nodeId;
     }
 
+    @Override
     public NodeIdentifier waitForElectedClusterCoordinator() {
         return waitForNodeIdentifier(() -> 
getElectedActiveCoordinatorNode(false));
     }
@@ -634,6 +637,22 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         return nodeStatuses.get(nodeId.getId());
     }
 
+    @Override
+    public NodeConnectionStatus fetchConnectionStatus(final NodeIdentifier 
nodeId) {
+        final NodeStatusesResponseMessage responseMessage = 
nodeProtocolSender.nodeStatuses(new NodeStatusesRequestMessage());
+        if (responseMessage == null || responseMessage.getNodeStatuses() == 
null) {
+            logger.warn("Failed to fetch connection status for {}", nodeId);
+            return null;
+        }
+
+        logger.debug("Received Node Statuses Response [{}]", 
responseMessage.getNodeStatuses());
+
+        return responseMessage.getNodeStatuses().stream()
+                .filter(s -> s.getNodeIdentifier().equals(nodeId))
+                .findFirst()
+                .orElse(null);
+    }
+
     private NodeConnectionState getConnectionState(final NodeIdentifier 
nodeId) {
         final NodeConnectionStatus status = getConnectionStatus(nodeId);
         return status == null ? null : status.getState();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/lifecycle/ClusterDecommissionTask.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/lifecycle/ClusterDecommissionTask.java
index 46a1f269cf..eb3183db1a 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/lifecycle/ClusterDecommissionTask.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/lifecycle/ClusterDecommissionTask.java
@@ -39,7 +39,7 @@ import java.util.concurrent.TimeUnit;
 
 public class ClusterDecommissionTask implements DecommissionTask {
     private static final Logger logger = 
LoggerFactory.getLogger(ClusterDecommissionTask.class);
-    private static final int delaySeconds = 3;
+    private static final int DELAY_SECONDS = 3;
 
     private final ClusterCoordinator clusterCoordinator;
     private final FlowController flowController;
@@ -62,6 +62,12 @@ public class ClusterDecommissionTask implements 
DecommissionTask {
             throw new IllegalStateException("Node has not yet connected to the 
cluster");
         }
 
+        final boolean wasCoordinator = 
clusterCoordinator.isActiveClusterCoordinator();
+        final NodeIdentifier coordinatorId = 
clusterCoordinator.getElectedActiveCoordinatorNode();
+        if (wasCoordinator) {
+            logger.info("Decommissioning Cluster Coordinator [{}]...", 
coordinatorId);
+        }
+
         flowController.stopHeartbeating();
         flowController.setClustered(false, null);
         logger.info("Instructed FlowController to stop sending heartbeats to 
Cluster Coordinator and take Cluster Disconnect actions");
@@ -72,6 +78,10 @@ public class ClusterDecommissionTask implements 
DecommissionTask {
         waitForDisconnection();
         logger.info("Successfully disconnected node from cluster");
 
+        if (wasCoordinator) {
+            waitForNotActiveClusterCoordinator(coordinatorId);
+        }
+
         offloadNode();
         logger.info("Successfully triggered Node Offload. Will wait for 
offload to complete");
 
@@ -125,7 +135,13 @@ public class ClusterDecommissionTask implements 
DecommissionTask {
 
     private void waitForState(final Set<NodeConnectionState> acceptableStates) 
throws InterruptedException {
         while (true) {
-            final NodeConnectionStatus status = 
clusterCoordinator.getConnectionStatus(localNodeIdentifier);
+            final NodeConnectionStatus status = 
clusterCoordinator.fetchConnectionStatus(localNodeIdentifier);
+            if (status == null) {
+                logger.debug("Node connection status is null. Will wait {} 
seconds and check again", DELAY_SECONDS);
+                TimeUnit.SECONDS.sleep(DELAY_SECONDS);
+                continue;
+            }
+
             final NodeConnectionState state = status.getState();
             logger.debug("Node state is {}", state);
 
@@ -133,7 +149,7 @@ public class ClusterDecommissionTask implements 
DecommissionTask {
                 return;
             }
 
-            TimeUnit.SECONDS.sleep(delaySeconds);
+            TimeUnit.SECONDS.sleep(DELAY_SECONDS);
         }
     }
 
@@ -142,7 +158,7 @@ public class ClusterDecommissionTask implements 
DecommissionTask {
 
         int iterations = 0;
         while (true) {
-            final NodeConnectionStatus status = 
clusterCoordinator.getConnectionStatus(localNodeIdentifier);
+            final NodeConnectionStatus status = 
clusterCoordinator.fetchConnectionStatus(localNodeIdentifier);
             final NodeConnectionState state = status.getState();
             if (state == NodeConnectionState.OFFLOADED) {
                 return;
@@ -159,10 +175,10 @@ public class ClusterDecommissionTask implements 
DecommissionTask {
                 final long byteCount = statusCounts.getQueuedContentSize();
                 logger.info("Node state is OFFLOADING. Currently, there are {} 
FlowFiles ({} bytes) left on node.", flowFileCount, byteCount);
             } else {
-                logger.debug("Node state is OFFLOADING. Will wait {} seconds 
and check again", delaySeconds);
+                logger.debug("Node state is OFFLOADING. Will wait {} seconds 
and check again", DELAY_SECONDS);
             }
 
-            TimeUnit.SECONDS.sleep(delaySeconds);
+            TimeUnit.SECONDS.sleep(DELAY_SECONDS);
         }
     }
 
@@ -174,7 +190,7 @@ public class ClusterDecommissionTask implements 
DecommissionTask {
         logger.info("Waiting for Node to be completely removed from cluster");
 
         while (true) {
-            final NodeConnectionStatus status = 
clusterCoordinator.getConnectionStatus(localNodeIdentifier);
+            final NodeConnectionStatus status = 
clusterCoordinator.fetchConnectionStatus(localNodeIdentifier);
             if (status == null) {
                 return;
             }
@@ -184,8 +200,29 @@ public class ClusterDecommissionTask implements 
DecommissionTask {
                 return;
             }
 
-            logger.debug("Node state is {}. Will wait {} seconds and check 
again", state, delaySeconds);
-            TimeUnit.SECONDS.sleep(delaySeconds);
+            logger.debug("Node state is {}. Will wait {} seconds and check 
again", state, DELAY_SECONDS);
+            TimeUnit.SECONDS.sleep(DELAY_SECONDS);
         }
     }
+
+    private void waitForNotActiveClusterCoordinator(final NodeIdentifier 
nodeIdentifier) throws InterruptedException {
+        logger.info("Waiting to no longer be the Active Cluster Coordinator");
+
+        // We check getElectedActiveCoordinatorNode here instead of calling 
isActiveClusterCoordinator because in the case of Kubernetes leader election,
+        // isActiveClusterCoordinator will return false since the current node 
unregistered from election for the coordinator role, but it still may hold
+        // the lease in Kubernetes, so we need to check the elected 
coordinator node id directly
+        while (true) {
+            final NodeIdentifier coordinatorId = 
clusterCoordinator.getElectedActiveCoordinatorNode();
+            logger.debug("Elected Cluster Coordinator is {}", coordinatorId);
+
+            if (coordinatorId != null && 
!coordinatorId.equals(nodeIdentifier)) {
+                logger.info("New Cluster Coordinator elected: {}", 
clusterCoordinator.getElectedActiveCoordinatorNode());
+                return;
+            }
+
+            logger.debug("Still considered the Active Cluster Coordinator. 
Will wait {} seconds and check again", DELAY_SECONDS);
+            TimeUnit.SECONDS.sleep(DELAY_SECONDS);
+        }
+    }
+
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 45cbf391ae..a41051431e 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -274,6 +274,11 @@ public class TestAbstractHeartbeatMonitor {
             return statuses.get(nodeId);
         }
 
+        @Override
+        public NodeConnectionStatus fetchConnectionStatus(NodeIdentifier 
nodeId) {
+            return getConnectionStatus(nodeId);
+        }
+
         @Override
         public synchronized Set<NodeIdentifier> 
getNodeIdentifiers(NodeConnectionState... states) {
             final Set<NodeConnectionState> stateSet = new HashSet<>();
@@ -366,6 +371,11 @@ public class TestAbstractHeartbeatMonitor {
             return null;
         }
 
+        @Override
+        public NodeIdentifier waitForElectedClusterCoordinator() {
+            return null;
+        }
+
         @Override
         public List<NodeConnectionStatus> getConnectionStatuses() {
             return Collections.emptyList();
@@ -395,9 +405,6 @@ public class TestAbstractHeartbeatMonitor {
         public void registerEventListener(final ClusterTopologyEventListener 
eventListener) {
         }
 
-        @Override
-        public void unregisterEventListener(final ClusterTopologyEventListener 
eventListener) {
-        }
     }
 
 

Reply via email to