Repository: nifi
Updated Branches:
  refs/heads/master 3e9867d5d -> 7779af69b


NIFI-2292: Funnel all cluster node status changes through the cluster 
coordinator instead of having each node broadcast changes to the whole cluster. 
This gives us the ability to increment the updateId consistently without race 
conditions.

This closes #717

Signed-off-by: jpercivall <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7779af69
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7779af69
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7779af69

Branch: refs/heads/master
Commit: 7779af69b4f6fb562e9ff0a9d135e355b8245854
Parents: 3e9867d
Author: Mark Payne <[email protected]>
Authored: Mon Jul 25 13:41:48 2016 -0400
Committer: jpercivall <[email protected]>
Committed: Mon Jul 25 23:17:37 2016 -0400

----------------------------------------------------------------------
 .../coordination/ClusterCoordinator.java        |   5 +
 .../coordination/node/DisconnectionCode.java    |   5 +
 .../ClusterCoordinationProtocolSender.java      |   2 +-
 .../heartbeat/AbstractHeartbeatMonitor.java     |   6 +-
 .../http/replication/RequestReplicator.java     |   5 +-
 .../ThreadPoolRequestReplicator.java            |  17 +-
 .../node/NodeClusterCoordinator.java            |  82 ++++++++--
 .../heartbeat/TestAbstractHeartbeatMonitor.java |   5 +
 .../TestThreadPoolRequestReplicator.java        |  13 +-
 .../node/TestNodeClusterCoordinator.java        | 155 +++++++++----------
 .../apache/nifi/controller/FlowController.java  |   4 +-
 .../nifi/web/StandardNiFiContentAccess.java     |   2 +-
 .../StandardNiFiWebConfigurationContext.java    |   2 +-
 .../nifi/web/api/ApplicationResource.java       |  25 ++-
 .../apache/nifi/web/api/ControllerResource.java |   8 +
 .../apache/nifi/web/api/CountersResource.java   |   2 +-
 .../nifi/web/api/ProcessGroupResource.java      |   2 +-
 .../nifi/web/api/SystemDiagnosticsResource.java |   2 +-
 18 files changed, 213 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7779af69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
index a2e17b5..4894fc5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
@@ -173,6 +173,11 @@ public interface ClusterCoordinator {
     NodeIdentifier getElectedActiveCoordinatorNode();
 
     /**
+     * @return the identifier of this node, if it is known, <code>null</code> 
if the Node Identifier has not yet been established.
+     */
+    NodeIdentifier getLocalNodeIdentifier();
+
+    /**
      * @return <code>true</code> if this node has been elected the active 
cluster coordinator, <code>false</code> otherwise.
      */
     boolean isActiveClusterCoordinator();

http://git-wip-us.apache.org/repos/asf/nifi/blob/7779af69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java
index ae18699..48ca41d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java
@@ -68,6 +68,11 @@ public enum DisconnectionCode {
     FAILED_TO_SERVICE_REQUEST("Failed to Service Request"),
 
     /**
+     * Coordinator received a heartbeat from node, but the node is 
disconnected from the cluster
+     */
+    HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE("Heartbeat Received from 
Disconnected Node"),
+
+    /**
      * Node is being shut down
      */
     NODE_SHUTDOWN("Node was Shutdown");

http://git-wip-us.apache.org/repos/asf/nifi/blob/7779af69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java
index a1af0f8..b49f57c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java
@@ -25,7 +25,7 @@ import 
org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
 import org.apache.nifi.reporting.BulletinRepository;
 
 /**
- * An interface for sending protocol messages from the cluster manager to 
nodes.
+ * An interface for sending protocol messages from the cluster coordinator to 
nodes.
  *
  */
 public interface ClusterCoordinationProtocolSender {

http://git-wip-us.apache.org/repos/asf/nifi/blob/7779af69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index be9559d..0bd84d6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -126,9 +126,7 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
     protected synchronized void monitorHeartbeats() {
         final Map<NodeIdentifier, NodeHeartbeat> latestHeartbeats = 
getLatestHeartbeats();
         if (latestHeartbeats == null || latestHeartbeats.isEmpty()) {
-            // failed to fetch heartbeats; don't change anything.
-            clusterCoordinator.reportEvent(null, Severity.INFO, "Failed to 
retrieve any new heartbeat information for nodes. "
-                + "Will not make any decisions based on heartbeats.");
+            logger.debug("Received no new heartbeats. Will not disconnect any 
nodes due to lack of heartbeat");
             return;
         }
 
@@ -213,7 +211,7 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
             } else {
                 // disconnected nodes should not heartbeat, so we need to 
issue a disconnection request.
                 logger.info("Ignoring received heartbeat from disconnected 
node " + nodeId + ".  Issuing disconnection request.");
-                clusterCoordinator.requestNodeDisconnect(nodeId, 
connectionStatus.getDisconnectCode(), connectionStatus.getDisconnectReason());
+                clusterCoordinator.requestNodeDisconnect(nodeId, 
DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE, 
DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE.toString());
                 removeHeartbeat(nodeId);
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7779af69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
index 6724901..bfe528d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
@@ -98,10 +98,13 @@ public interface RequestReplicator {
      * @param headers any HTTP headers
      * @param indicateReplicated if <code>true</code>, will add a header 
indicating to the receiving nodes that the request
      *            has already been replicated, so the receiving node will not 
replicate the request itself.
+     * @param performVerification if <code>true</code>, and the request is 
mutable, will verify that all nodes are connected before
+     *            making the request and that all nodes are able to perform 
the request before acutally attempting to perform the task.
+     *            If false, will perform no such verification
      *
      * @return an AsyncClusterResponse that indicates the current status of 
the request and provides an identifier for obtaining an updated response later
      */
-    AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, 
URI uri, Object entity, Map<String, String> headers, boolean 
indicateReplicated);
+    AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, 
URI uri, Object entity, Map<String, String> headers, boolean 
indicateReplicated, boolean performVerification);
 
     /**
      * <p>

http://git-wip-us.apache.org/repos/asf/nifi/blob/7779af69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index f4fcc85..c5a8af5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -211,11 +211,12 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
 
         final Set<NodeIdentifier> nodeIdSet = new HashSet<>(nodeIds);
 
-        return replicate(nodeIdSet, method, uri, entity, headers, true);
+        return replicate(nodeIdSet, method, uri, entity, headers, true, true);
     }
 
     @Override
-    public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String 
method, URI uri, Object entity, Map<String, String> headers, final boolean 
indicateReplicated) {
+    public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String 
method, URI uri, Object entity, Map<String, String> headers,
+            final boolean indicateReplicated, final boolean 
performVerification) {
         final Map<String, String> updatedHeaders = new HashMap<>(headers);
 
         
updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, 
TypeOneUUIDGenerator.generateId().toString());
@@ -242,12 +243,12 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
             lock.lock();
             try {
                 logger.debug("Lock {} obtained in order to replicate request 
{} {}", method, uri);
-                return replicate(nodeIds, method, uri, entity, updatedHeaders, 
true, null);
+                return replicate(nodeIds, method, uri, entity, updatedHeaders, 
performVerification, null);
             } finally {
                 lock.unlock();
             }
         } else {
-            return replicate(nodeIds, method, uri, entity, updatedHeaders, 
true, null);
+            return replicate(nodeIds, method, uri, entity, updatedHeaders, 
performVerification, null);
         }
     }
 
@@ -259,13 +260,13 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
      * @param uri the URI to send the request to
      * @param entity the entity to use
      * @param headers the HTTP Headers
-     * @param performVerification whether or not to use 2-phase commit to 
verify that all nodes can handle the request. Ignored if request is not mutable.
+     * @param performVerification whether or not to verify that all nodes in 
the cluster are connected and that all nodes can perform request. Ignored if 
request is not mutable.
      * @param response the response to update with the results
      *
      * @return an AsyncClusterResponse that can be used to obtain the result
      */
     private AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String 
method, URI uri, Object entity, Map<String, String> headers, boolean 
performVerification,
-        StandardAsyncClusterResponse response) {
+            StandardAsyncClusterResponse response) {
 
         // state validation
         Objects.requireNonNull(nodeIds);
@@ -298,7 +299,7 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
         final String requestId = 
updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> 
UUID.randomUUID().toString());
 
         if (performVerification) {
-            verifyState(method, uri.getPath());
+            verifyClusterState(method, uri.getPath());
         }
 
         int numRequests = responseMap.size();
@@ -530,7 +531,7 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
      *
      * @throw IllegalClusterStateException if the cluster is not in a state 
that allows a request to made to the given URI Path using the given HTTP Method
      */
-    private void verifyState(final String httpMethod, final String uriPath) {
+    private void verifyClusterState(final String httpMethod, final String 
uriPath) {
         final boolean mutableRequest = HttpMethod.DELETE.equals(httpMethod) || 
HttpMethod.POST.equals(httpMethod) || HttpMethod.PUT.equals(httpMethod);
 
         // check that the request can be applied

http://git-wip-us.apache.org/repos/asf/nifi/blob/7779af69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 3f8fa76..b31530f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -138,14 +139,23 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         this.nodeId = nodeId;
     }
 
-    NodeIdentifier getLocalNodeIdentifier() {
+    @Override
+    public NodeIdentifier getLocalNodeIdentifier() {
         return nodeId;
     }
 
     private NodeIdentifier waitForLocalNodeIdentifier() {
+        return waitForNodeIdentifier(() -> getLocalNodeIdentifier());
+    }
+
+    private NodeIdentifier waitForElectedClusterCoordinator() {
+        return waitForNodeIdentifier(() -> 
getElectedActiveCoordinatorNode(false));
+    }
+
+    private NodeIdentifier waitForNodeIdentifier(final 
Supplier<NodeIdentifier> fetchNodeId) {
         NodeIdentifier localNodeId = null;
         while (localNodeId == null) {
-            localNodeId = getLocalNodeIdentifier();
+            localNodeId = fetchNodeId.get();
             if (localNodeId == null) {
                 try {
                     Thread.sleep(100L);
@@ -279,8 +289,8 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
     @Override
     public void requestNodeDisconnect(final NodeIdentifier nodeId, final 
DisconnectionCode disconnectionCode, final String explanation) {
-        final int numConnected = 
getNodeIdentifiers(NodeConnectionState.CONNECTED).size();
-        if (numConnected == 1) {
+        final Set<NodeIdentifier> connectedNodeIds = 
getNodeIdentifiers(NodeConnectionState.CONNECTED);
+        if (connectedNodeIds.size() == 1 && connectedNodeIds.contains(nodeId)) 
{
             throw new IllegalNodeDisconnectionException("Cannot disconnect 
node " + nodeId + " because it is the only node currently connected");
         }
 
@@ -514,17 +524,27 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
     @Override
     public NodeIdentifier getElectedActiveCoordinatorNode() {
+        return getElectedActiveCoordinatorNode(true);
+    }
+
+    private NodeIdentifier getElectedActiveCoordinatorNode(final boolean 
warnOnError) {
         final String electedNodeAddress;
         try {
             electedNodeAddress = getElectedActiveCoordinatorAddress();
         } catch (final IOException ioe) {
-            logger.warn("Failed to determine which node is elected active 
Cluster Coordinator. There may be no coordinator currently:", ioe);
+            if (warnOnError) {
+                logger.warn("Failed to determine which node is elected active 
Cluster Coordinator. There may be no coordinator currently:", ioe);
+            }
+
             return null;
         }
 
         final int colonLoc = electedNodeAddress.indexOf(':');
         if (colonLoc < 1) {
-            logger.warn("Failed to determine which node is elected active 
Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a 
valid address", electedNodeAddress);
+            if (warnOnError) {
+                logger.warn("Failed to determine which node is elected active 
Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a 
valid address", electedNodeAddress);
+            }
+
             return null;
         }
 
@@ -534,7 +554,10 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         try {
             electedNodePort = Integer.parseInt(portString);
         } catch (final NumberFormatException nfe) {
-            logger.warn("Failed to determine which node is elected active 
Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a 
valid address", electedNodeAddress);
+            if (warnOnError) {
+                logger.warn("Failed to determine which node is elected active 
Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a 
valid address", electedNodeAddress);
+            }
+
             return null;
         }
 
@@ -544,7 +567,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
             .findFirst()
             .orElse(null);
 
-        if (electedNodeId == null) {
+        if (electedNodeId == null && warnOnError) {
             logger.warn("Failed to determine which node is elected active 
Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node 
with this address", electedNodeAddress);
         }
 
@@ -610,16 +633,37 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         logger.debug("State of cluster nodes is now {}", nodeStatuses);
 
         if (currentState == null || currentState != status.getState()) {
-            notifyOthersOfNodeStatusChange(status);
+            // We notify all nodes of the status change if either this node is 
the current cluster coordinator, OR if the node was
+            // the cluster coordinator and no longer is. This is done because 
if a user disconnects the cluster coordinator, we need
+            // to broadcast to the cluster that this node is no longer the 
coordinator. Otherwise, all nodes but this one will still
+            // believe that this node is connected to the cluster.
+            final boolean notifyAllNodes = isActiveClusterCoordinator() || 
(currentStatus != null && 
currentStatus.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR));
+            notifyOthersOfNodeStatusChange(status, notifyAllNodes);
         }
     }
 
+    void notifyOthersOfNodeStatusChange(final NodeConnectionStatus 
updatedStatus) {
+        notifyOthersOfNodeStatusChange(updatedStatus, 
isActiveClusterCoordinator());
+    }
 
-    private void notifyOthersOfNodeStatusChange(final NodeConnectionStatus 
updatedStatus) {
-        final Set<NodeIdentifier> nodesToNotify = 
getNodeIdentifiers(NodeConnectionState.CONNECTED, 
NodeConnectionState.CONNECTING);
-
-        // Do not notify ourselves because we already know about the status 
update.
-        nodesToNotify.remove(getLocalNodeIdentifier());
+    /**
+     * Notifies other nodes that the status of a node changed
+     *
+     * @param updatedStatus the updated status for a node in the cluster
+     * @param notifyAllNodes if <code>true</code> will notify all nodes. If 
<code>false</code>, will notify only the cluster coordinator
+     */
+    void notifyOthersOfNodeStatusChange(final NodeConnectionStatus 
updatedStatus, final boolean notifyAllNodes) {
+        // If this node is the active cluster coordinator, then we are going 
to replicate to all nodes.
+        // Otherwise, get the active coordinator (or wait for one to become 
active) and then notify the coordinator.
+        final Set<NodeIdentifier> nodesToNotify;
+        if (notifyAllNodes) {
+            nodesToNotify = getNodeIdentifiers(NodeConnectionState.CONNECTED, 
NodeConnectionState.CONNECTING);
+
+            // Do not notify ourselves because we already know about the 
status update.
+            nodesToNotify.remove(getLocalNodeIdentifier());
+        } else {
+            nodesToNotify = 
Collections.singleton(waitForElectedClusterCoordinator());
+        }
 
         final NodeStatusChangeMessage message = new NodeStatusChangeMessage();
         message.setNodeId(updatedStatus.getNodeIdentifier());
@@ -767,6 +811,10 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
                     nodeId, updatedStatus, oldStatus);
             }
         }
+
+        if (isActiveClusterCoordinator()) {
+            
notifyOthersOfNodeStatusChange(statusChangeMessage.getNodeConnectionStatus());
+        }
     }
 
     private NodeIdentifier resolveNodeId(final NodeIdentifier 
proposedIdentifier) {
@@ -872,6 +920,12 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
      */
     @Override
     public void afterRequest(final String uriPath, final String method, final 
Set<NodeResponse> nodeResponses) {
+        // if we are not the active cluster coordinator, then we are not 
responsible for monitoring the responses,
+        // as the cluster coordinator is responsible for performing the actual 
request replication.
+        if (!isActiveClusterCoordinator()) {
+            return;
+        }
+
         final boolean mutableRequest = isMutableRequest(method);
 
         /*

http://git-wip-us.apache.org/repos/asf/nifi/blob/7779af69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 81d72ed..9ef0a14 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -305,6 +305,11 @@ public class TestAbstractHeartbeatMonitor {
         @Override
         public void removeRole(String clusterRole) {
         }
+
+        @Override
+        public NodeIdentifier getLocalNodeIdentifier() {
+            return null;
+        }
     }
 
     public static class ReportedEvent {

http://git-wip-us.apache.org/repos/asf/nifi/blob/7779af69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
index 2af3e88..5eac846 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@ -87,7 +87,7 @@ public class TestThreadPoolRequestReplicator {
             final URI uri = new URI("http://localhost:8080/processors/1";);
             final Entity entity = new ProcessorEntity();
 
-            final AsyncClusterResponse response = 
replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), 
true);
+            final AsyncClusterResponse response = 
replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), 
true, true);
 
             // We should get back the same response object
             assertTrue(response == 
replicator.getClusterResponse(response.getRequestIdentifier()));
@@ -115,7 +115,7 @@ public class TestThreadPoolRequestReplicator {
             final URI uri = new URI("http://localhost:8080/processors/1";);
             final Entity entity = new ProcessorEntity();
 
-            final AsyncClusterResponse response = 
replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), 
true);
+            final AsyncClusterResponse response = 
replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), 
true, true);
 
             // We should get back the same response object
             assertTrue(response == 
replicator.getClusterResponse(response.getRequestIdentifier()));
@@ -151,7 +151,7 @@ public class TestThreadPoolRequestReplicator {
             final URI uri = new URI("http://localhost:8080/processors/1";);
             final Entity entity = new ProcessorEntity();
 
-            final AsyncClusterResponse response = 
replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), 
true);
+            final AsyncClusterResponse response = 
replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), 
true, true);
             assertNotNull(response.awaitMergedResponse(1, TimeUnit.SECONDS));
         } , null, 0L, new IllegalArgumentException("Exception created for unit 
test"));
     }
@@ -191,7 +191,7 @@ public class TestThreadPoolRequestReplicator {
 
         try {
             final AsyncClusterResponse clusterResponse = 
replicator.replicate(nodeIds, HttpMethod.POST,
-                new URI("http://localhost:80/processors/1";), new 
ProcessorEntity(), new HashMap<>(), true);
+                new URI("http://localhost:80/processors/1";), new 
ProcessorEntity(), new HashMap<>(), true, true);
             clusterResponse.awaitMergedResponse();
 
             // Ensure that we received two requests - the first should contain 
the X-NcmExpects header; the second should not.
@@ -235,7 +235,8 @@ public class TestThreadPoolRequestReplicator {
         Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap);
         final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", 
null, null) {
             @Override
-            public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, 
String method, URI uri, Object entity, Map<String, String> headers, boolean 
indicateReplicated) {
+            public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, 
String method, URI uri, Object entity, Map<String, String> headers,
+                    boolean indicateReplicated, boolean verify) {
                 return null;
             }
         };
@@ -308,7 +309,7 @@ public class TestThreadPoolRequestReplicator {
 
         try {
             final AsyncClusterResponse clusterResponse = 
replicator.replicate(nodeIds, HttpMethod.POST,
-                new URI("http://localhost:80/processors/1";), new 
ProcessorEntity(), new HashMap<>(), true);
+                new URI("http://localhost:80/processors/1";), new 
ProcessorEntity(), new HashMap<>(), true, true);
             clusterResponse.awaitMergedResponse();
 
             Assert.fail("Expected to get an IllegalClusterStateException but 
did not");

http://git-wip-us.apache.org/repos/asf/nifi/blob/7779af69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index 25c55a0..6f0ad0f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -59,7 +59,7 @@ import org.mockito.stubbing.Answer;
 public class TestNodeClusterCoordinator {
     private NodeClusterCoordinator coordinator;
     private ClusterCoordinationProtocolSenderListener senderListener;
-    private List<NodeStatusChangeMessage> nodeStatusChangeMessages;
+    private List<NodeConnectionStatus> nodeStatuses;
 
     private Properties createProperties() {
         final Properties props = new Properties();
@@ -68,25 +68,20 @@ public class TestNodeClusterCoordinator {
     }
 
     @Before
-    @SuppressWarnings("unchecked")
     public void setup() throws IOException {
         senderListener = 
Mockito.mock(ClusterCoordinationProtocolSenderListener.class);
-        nodeStatusChangeMessages = Collections.synchronizedList(new 
ArrayList<>());
-
-        Mockito.doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable 
{
-                final NodeStatusChangeMessage statusChangeMessage = 
invocation.getArgumentAt(1, NodeStatusChangeMessage.class);
-                nodeStatusChangeMessages.add(statusChangeMessage);
-                return null;
-            }
-        }).when(senderListener).notifyNodeStatusChange(Mockito.any(Set.class), 
Mockito.any(NodeStatusChangeMessage.class));
+        nodeStatuses = Collections.synchronizedList(new ArrayList<>());
 
         final EventReporter eventReporter = Mockito.mock(EventReporter.class);
         final RevisionManager revisionManager = 
Mockito.mock(RevisionManager.class);
         
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        coordinator = new NodeClusterCoordinator(senderListener, 
eventReporter, null, revisionManager, createProperties());
+        coordinator = new NodeClusterCoordinator(senderListener, 
eventReporter, null, revisionManager, createProperties()) {
+            @Override
+            void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes) {
+                nodeStatuses.add(updatedStatus);
+            }
+        };
 
         final FlowService flowService = Mockito.mock(FlowService.class);
         final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], 
new byte[50], new byte[50]);
@@ -136,7 +131,11 @@ public class TestNodeClusterCoordinator {
         final RevisionManager revisionManager = 
Mockito.mock(RevisionManager.class);
         
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, 
createProperties());
+        final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, 
createProperties()) {
+            @Override
+            void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes) {
+            }
+        };
 
         final NodeIdentifier requestedNodeId = createNodeId(6);
         final ConnectionRequest request = new 
ConnectionRequest(requestedNodeId);
@@ -170,7 +169,11 @@ public class TestNodeClusterCoordinator {
         final RevisionManager revisionManager = 
Mockito.mock(RevisionManager.class);
         
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, 
createProperties());
+        final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, 
createProperties()) {
+            @Override
+            void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes) {
+            }
+        };
 
         final FlowService flowService = Mockito.mock(FlowService.class);
         final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], 
new byte[50], new byte[50]);
@@ -200,80 +203,60 @@ public class TestNodeClusterCoordinator {
         // Create a connection request message and send to the coordinator
         requestConnection(createNodeId(1), coordinator);
 
-        while (nodeStatusChangeMessages.isEmpty()) {
+        while (nodeStatuses.isEmpty()) {
             Thread.sleep(20L);
         }
-        assertEquals(NodeConnectionState.CONNECTING, 
nodeStatusChangeMessages.get(0).getNodeConnectionStatus().getState());
-        nodeStatusChangeMessages.clear();
+        assertEquals(NodeConnectionState.CONNECTING, 
nodeStatuses.get(0).getState());
+        nodeStatuses.clear();
 
         // Finish connecting. This should notify all that the status is now 
'CONNECTED'
         coordinator.finishNodeConnection(nodeId);
 
-        while (nodeStatusChangeMessages.isEmpty()) {
+        while (nodeStatuses.isEmpty()) {
             Thread.sleep(20L);
         }
-        assertEquals(NodeConnectionState.CONNECTED, 
nodeStatusChangeMessages.get(0).getNodeConnectionStatus().getState());
+        assertEquals(NodeConnectionState.CONNECTED, 
nodeStatuses.get(0).getState());
         assertEquals(NodeConnectionState.CONNECTED, 
coordinator.getConnectionStatus(nodeId).getState());
     }
 
     @Test(timeout = 5000)
-    @SuppressWarnings("unchecked")
     public void testStatusChangesReplicated() throws InterruptedException, 
IOException {
-        final ClusterCoordinationProtocolSenderListener senderListener = 
Mockito.mock(ClusterCoordinationProtocolSenderListener.class);
-        final List<NodeStatusChangeMessage> msgs = 
Collections.synchronizedList(new ArrayList<>());
-
-        Mockito.doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable 
{
-                final NodeStatusChangeMessage statusChangeMessage = 
invocation.getArgumentAt(1, NodeStatusChangeMessage.class);
-                msgs.add(statusChangeMessage);
-                return null;
-            }
-        }).when(senderListener).notifyNodeStatusChange(Mockito.any(Set.class), 
Mockito.any(NodeStatusChangeMessage.class));
-
-        final EventReporter eventReporter = Mockito.mock(EventReporter.class);
         final RevisionManager revisionManager = 
Mockito.mock(RevisionManager.class);
         
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
-        final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, 
createProperties());
-
-        final FlowService flowService = Mockito.mock(FlowService.class);
-        final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], 
new byte[50], new byte[50]);
-        Mockito.when(flowService.createDataFlow()).thenReturn(dataFlow);
-        coordinator.setFlowService(flowService);
 
         // Create a connection request message and send to the coordinator
         final NodeIdentifier requestedNodeId = createNodeId(1);
         requestConnection(requestedNodeId, coordinator);
 
         // The above connection request should trigger a 'CONNECTING' state 
transition to be replicated
-        while (msgs.isEmpty()) {
+        while (nodeStatuses.isEmpty()) {
             Thread.sleep(20L);
         }
-        final NodeStatusChangeMessage connectingMsg = msgs.get(0);
-        assertEquals(NodeConnectionState.CONNECTING, 
connectingMsg.getNodeConnectionStatus().getState());
-        assertEquals(requestedNodeId, connectingMsg.getNodeId());
+        final NodeConnectionStatus connectingStatus = nodeStatuses.get(0);
+        assertEquals(NodeConnectionState.CONNECTING, 
connectingStatus.getState());
+        assertEquals(requestedNodeId, connectingStatus.getNodeIdentifier());
 
         // set node status to connected
         coordinator.finishNodeConnection(requestedNodeId);
 
         // the above method will result in the node identifier becoming 
'CONNECTED'. Wait for this to happen and clear the map
-        while (msgs.isEmpty()) {
+        while (nodeStatuses.isEmpty()) {
             Thread.sleep(20L);
         }
-        msgs.clear();
+        nodeStatuses.clear();
 
         coordinator.disconnectionRequestedByNode(requestedNodeId, 
DisconnectionCode.NODE_SHUTDOWN, "Unit Test");
 
-        while (msgs.isEmpty()) {
+        while (nodeStatuses.isEmpty()) {
             Thread.sleep(20L);
         }
 
-        assertEquals(1, msgs.size());
-        final NodeStatusChangeMessage statusChangeMsg = msgs.get(0);
-        assertNotNull(statusChangeMsg);
-        assertEquals(createNodeId(1), statusChangeMsg.getNodeId());
-        assertEquals(DisconnectionCode.NODE_SHUTDOWN, 
statusChangeMsg.getNodeConnectionStatus().getDisconnectCode());
-        assertEquals("Unit Test", 
statusChangeMsg.getNodeConnectionStatus().getDisconnectReason());
+        assertEquals(1, nodeStatuses.size());
+        final NodeConnectionStatus statusChange = nodeStatuses.get(0);
+        assertNotNull(statusChange);
+        assertEquals(createNodeId(1), statusChange.getNodeIdentifier());
+        assertEquals(DisconnectionCode.NODE_SHUTDOWN, 
statusChange.getDisconnectCode());
+        assertEquals("Unit Test", statusChange.getDisconnectReason());
     }
 
 
@@ -343,20 +326,20 @@ public class TestNodeClusterCoordinator {
         coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), 
NodeConnectionState.CONNECTED, Collections.emptySet()));
 
         // wait for the status change message and clear it
-        while (nodeStatusChangeMessages.isEmpty()) {
+        while (nodeStatuses.isEmpty()) {
             Thread.sleep(10L);
         }
-        nodeStatusChangeMessages.clear();
+        nodeStatuses.clear();
 
         coordinator.requestNodeDisconnect(nodeId1, 
DisconnectionCode.USER_DISCONNECTED, "Unit Test");
         assertEquals(NodeConnectionState.DISCONNECTED, 
coordinator.getConnectionStatus(nodeId1).getState());
 
-        while (nodeStatusChangeMessages.isEmpty()) {
+        while (nodeStatuses.isEmpty()) {
             Thread.sleep(10L);
         }
-        final NodeStatusChangeMessage msg = nodeStatusChangeMessages.get(0);
-        assertEquals(nodeId1, msg.getNodeId());
-        assertEquals(NodeConnectionState.DISCONNECTED, 
msg.getNodeConnectionStatus().getState());
+        final NodeConnectionStatus status = nodeStatuses.get(0);
+        assertEquals(nodeId1, status.getNodeIdentifier());
+        assertEquals(NodeConnectionState.DISCONNECTED, status.getState());
     }
 
 
@@ -364,13 +347,17 @@ public class TestNodeClusterCoordinator {
     public void testCannotDisconnectLastNode() throws InterruptedException {
         // Add a connected node
         final NodeIdentifier nodeId1 = createNodeId(1);
+        final NodeIdentifier nodeId2 = createNodeId(2);
         coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, 
NodeConnectionState.CONNECTED, Collections.emptySet()));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, 
NodeConnectionState.CONNECTED, Collections.emptySet()));
 
         // wait for the status change message and clear it
-        while (nodeStatusChangeMessages.isEmpty()) {
+        while (nodeStatuses.isEmpty()) {
             Thread.sleep(10L);
         }
-        nodeStatusChangeMessages.clear();
+        nodeStatuses.clear();
+
+        coordinator.requestNodeDisconnect(nodeId2, 
DisconnectionCode.USER_DISCONNECTED, "Unit Test");
 
         try {
             coordinator.requestNodeDisconnect(nodeId1, 
DisconnectionCode.USER_DISCONNECTED, "Unit Test");
@@ -378,6 +365,9 @@ public class TestNodeClusterCoordinator {
         } catch (final IllegalNodeDisconnectionException inde) {
             // expected
         }
+
+        // Should still be able to request that node 2 disconnect, since it's 
not the node that is connected
+        coordinator.requestNodeDisconnect(nodeId2, 
DisconnectionCode.USER_DISCONNECTED, "Unit Test");
     }
 
 
@@ -391,10 +381,10 @@ public class TestNodeClusterCoordinator {
         coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, 
NodeConnectionState.CONNECTED, Collections.emptySet()));
 
         // wait for the status change message and clear it
-        while (nodeStatusChangeMessages.size() < 2) {
+        while (nodeStatuses.size() < 2) {
             Thread.sleep(10L);
         }
-        nodeStatusChangeMessages.clear();
+        nodeStatuses.clear();
 
         final NodeConnectionStatus oldStatus = new NodeConnectionStatus(-1L, 
nodeId1, NodeConnectionState.DISCONNECTED,
             DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L, null);
@@ -405,7 +395,7 @@ public class TestNodeClusterCoordinator {
 
         // Ensure that no status change message was send
         Thread.sleep(1000);
-        assertTrue(nodeStatusChangeMessages.isEmpty());
+        assertTrue(nodeStatuses.isEmpty());
 
         // Status should not have changed because our status id is too small.
         NodeConnectionStatus curStatus = 
coordinator.getConnectionStatus(nodeId1);
@@ -431,51 +421,51 @@ public class TestNodeClusterCoordinator {
 
         coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, 
NodeConnectionState.CONNECTED, Collections.emptySet()));
         // wait for the status change message and clear it
-        while (nodeStatusChangeMessages.isEmpty()) {
+        while (nodeStatuses.isEmpty()) {
             Thread.sleep(10L);
         }
-        nodeStatusChangeMessages.clear();
+        nodeStatuses.clear();
 
         coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, 
NodeConnectionState.CONNECTED, Collections.emptySet()));
         // wait for the status change message and clear it
-        while (nodeStatusChangeMessages.isEmpty()) {
+        while (nodeStatuses.isEmpty()) {
             Thread.sleep(10L);
         }
-        nodeStatusChangeMessages.clear();
+        nodeStatuses.clear();
 
         // Update role of node 1 to primary node
         coordinator.updateNodeRoles(nodeId1, 
Collections.singleton(ClusterRoles.PRIMARY_NODE));
 
         // wait for the status change message
-        while (nodeStatusChangeMessages.isEmpty()) {
+        while (nodeStatuses.isEmpty()) {
             Thread.sleep(10L);
         }
         // verify the message
-        final NodeStatusChangeMessage msg = nodeStatusChangeMessages.get(0);
-        assertNotNull(msg);
-        assertEquals(nodeId1, msg.getNodeId());
-        assertEquals(NodeConnectionState.CONNECTED, 
msg.getNodeConnectionStatus().getState());
-        assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), 
msg.getNodeConnectionStatus().getRoles());
-        nodeStatusChangeMessages.clear();
+        final NodeConnectionStatus status = nodeStatuses.get(0);
+        assertNotNull(status);
+        assertEquals(nodeId1, status.getNodeIdentifier());
+        assertEquals(NodeConnectionState.CONNECTED, status.getState());
+        assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), 
status.getRoles());
+        nodeStatuses.clear();
 
         // Update role of node 2 to primary node. This should trigger 2 status 
changes -
         // node 1 should lose primary role & node 2 should gain it
         coordinator.updateNodeRoles(nodeId2, 
Collections.singleton(ClusterRoles.PRIMARY_NODE));
 
         // wait for the status change message
-        while (nodeStatusChangeMessages.size() < 2) {
+        while (nodeStatuses.size() < 2) {
             Thread.sleep(10L);
         }
 
-        final NodeStatusChangeMessage msg1 = nodeStatusChangeMessages.get(0);
-        final NodeStatusChangeMessage msg2 = nodeStatusChangeMessages.get(1);
-        final NodeStatusChangeMessage id1Msg = 
(msg1.getNodeId().equals(nodeId1)) ? msg1 : msg2;
-        final NodeStatusChangeMessage id2Msg = 
(msg1.getNodeId().equals(nodeId2)) ? msg1 : msg2;
+        final NodeConnectionStatus status1 = nodeStatuses.get(0);
+        final NodeConnectionStatus status2 = nodeStatuses.get(1);
+        final NodeConnectionStatus id1Msg = 
(status1.getNodeIdentifier().equals(nodeId1)) ? status1 : status2;
+        final NodeConnectionStatus id2Msg = 
(status1.getNodeIdentifier().equals(nodeId2)) ? status1 : status2;
 
         assertNotSame(id1Msg, id2Msg);
 
-        assertTrue(id1Msg.getNodeConnectionStatus().getRoles().isEmpty());
-        assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), 
id2Msg.getNodeConnectionStatus().getRoles());
+        assertTrue(id1Msg.getRoles().isEmpty());
+        assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), 
id2Msg.getRoles());
     }
 
 
@@ -513,7 +503,6 @@ public class TestNodeClusterCoordinator {
         assertEquals(conflictingId.getSocketPort(), 
conflictingNodeId.getSocketPort());
     }
 
-
     private NodeIdentifier createNodeId(final int index) {
         return new NodeIdentifier(String.valueOf(index), "localhost", 8000 + 
index, "localhost", 9000 + index, "localhost", 10000 + index, 11000 + index, 
false);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7779af69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index d6e9308..30e382d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -3260,7 +3260,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     private void registerForClusterCoordinator() {
         leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new 
LeaderElectionStateChangeListener() {
             @Override
-            public void onLeaderRelinquish() {
+            public synchronized void onLeaderRelinquish() {
                 heartbeatMonitor.stop();
 
                 if (clusterCoordinator != null) {
@@ -3269,7 +3269,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             }
 
             @Override
-            public void onLeaderElection() {
+            public synchronized void onLeaderElection() {
                 heartbeatMonitor.start();
 
                 if (clusterCoordinator != null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/7779af69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
index e2b63a2..1786bce 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
@@ -98,7 +98,7 @@ public class StandardNiFiContentAccess implements 
ContentAccess {
                     throw new NoClusterCoordinatorException();
                 }
                 final Set<NodeIdentifier> coordinatorNodes = 
Collections.singleton(coordinatorNode);
-                nodeResponse = requestReplicator.replicate(coordinatorNodes, 
HttpMethod.GET, dataUri, parameters, headers, false).awaitMergedResponse();
+                nodeResponse = requestReplicator.replicate(coordinatorNodes, 
HttpMethod.GET, dataUri, parameters, headers, false, 
true).awaitMergedResponse();
             } catch (InterruptedException e) {
                 throw new IllegalClusterStateException("Interrupted while 
waiting for a response from node");
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7779af69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
index 641ed38..fd3c474 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
@@ -297,7 +297,7 @@ public class StandardNiFiWebConfigurationContext implements 
NiFiWebConfiguration
         }
 
         final Set<NodeIdentifier> coordinatorNodes = 
Collections.singleton(coordinatorNode);
-        return requestReplicator.replicate(coordinatorNodes, method, uri, 
entity, headers, false).awaitMergedResponse();
+        return requestReplicator.replicate(coordinatorNodes, method, uri, 
entity, headers, false, true).awaitMergedResponse();
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7779af69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index a49ed5d..6a9e1d0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -565,11 +565,11 @@ public abstract class ApplicationResource {
                 // If we are to replicate directly to the nodes, we need to 
indicate that the replication source is
                 // the cluster coordinator so that the node knows to service 
the request.
                 final Set<NodeIdentifier> targetNodes = 
Collections.singleton(nodeId);
-                return requestReplicator.replicate(targetNodes, method, path, 
entity, headers, true).awaitMergedResponse().getResponse();
+                return requestReplicator.replicate(targetNodes, method, path, 
entity, headers, true, true).awaitMergedResponse().getResponse();
             } else {
                 
headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, 
nodeId.getId());
                 return 
requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), 
method,
-                    path, entity, headers, 
false).awaitMergedResponse().getResponse();
+                    path, entity, headers, false, 
true).awaitMergedResponse().getResponse();
             }
         } catch (final InterruptedException ie) {
             return 
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + 
method + " " + path + " was interrupted").type("text/plain").build();
@@ -589,23 +589,38 @@ public abstract class ApplicationResource {
         return clusterCoordinator.isActiveClusterCoordinator() ? 
ReplicationTarget.CLUSTER_NODES : ReplicationTarget.CLUSTER_COORDINATOR;
     }
 
+
     protected Response replicate(final String method, final NodeIdentifier 
targetNode) {
+        return replicate(method, targetNode, getRequestParameters());
+    }
+
+    protected Response replicate(final String method, final NodeIdentifier 
targetNode, final Object entity) {
         try {
             // Determine whether we should replicate only to the cluster 
coordinator, or if we should replicate directly
             // to the cluster nodes themselves.
             if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
                 final Set<NodeIdentifier> nodeIds = 
Collections.singleton(targetNode);
-                return getRequestReplicator().replicate(nodeIds, method, 
getAbsolutePath(), getRequestParameters(), getHeaders(), 
true).awaitMergedResponse().getResponse();
+                return getRequestReplicator().replicate(nodeIds, method, 
getAbsolutePath(), entity, getHeaders(), true, 
true).awaitMergedResponse().getResponse();
             } else {
                 final Set<NodeIdentifier> coordinatorNode = 
Collections.singleton(getClusterCoordinatorNode());
                 final Map<String, String> headers = 
getHeaders(Collections.singletonMap(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER,
 targetNode.getId()));
-                return getRequestReplicator().replicate(coordinatorNode, 
method, getAbsolutePath(), getRequestParameters(), headers, 
false).awaitMergedResponse().getResponse();
+                return getRequestReplicator().replicate(coordinatorNode, 
method, getAbsolutePath(), entity, headers, false, 
true).awaitMergedResponse().getResponse();
             }
         } catch (final InterruptedException ie) {
             return 
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + 
method + " " + getAbsolutePath() + " was 
interrupted").type("text/plain").build();
         }
     }
 
+    protected Response replicateToCoordinator(final String method, final 
Object entity) {
+        try {
+            final NodeIdentifier coordinatorNode = getClusterCoordinatorNode();
+            final Set<NodeIdentifier> coordinatorNodes = 
Collections.singleton(coordinatorNode);
+            return getRequestReplicator().replicate(coordinatorNodes, method, 
getAbsolutePath(), entity, getHeaders(), true, 
false).awaitMergedResponse().getResponse();
+        } catch (final InterruptedException ie) {
+            return 
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + 
method + " " + getAbsolutePath() + " was 
interrupted").type("text/plain").build();
+        }
+    }
+
     /**
      * Convenience method for calling {@link #replicate(String, Object)} with 
an entity of
      * {@link #getRequestParameters() getRequestParameters(true)}
@@ -685,7 +700,7 @@ public abstract class ApplicationResource {
         if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
             return requestReplicator.replicate(method, path, entity, 
headers).awaitMergedResponse();
         } else {
-            return 
requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), 
method, path, entity, headers, false).awaitMergedResponse();
+            return 
requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), 
method, path, entity, headers, false, true).awaitMergedResponse();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7779af69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
index 2f73c70..7a5dab7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
@@ -548,6 +548,10 @@ public class ControllerResource extends 
ApplicationResource {
                     + "not equal the node id of the requested resource (%s).", 
requestNodeDTO.getNodeId(), id));
         }
 
+        if (isReplicateRequest()) {
+            return replicateToCoordinator(HttpMethod.PUT, nodeEntity);
+        }
+
         // update the node
         final NodeDTO node = serviceFacade.updateNode(requestNodeDTO);
 
@@ -600,6 +604,10 @@ public class ControllerResource extends 
ApplicationResource {
             throw new IllegalClusterResourceRequestException("Only a node 
connected to a cluster can process the request.");
         }
 
+        if (isReplicateRequest()) {
+            return replicateToCoordinator(HttpMethod.DELETE, 
getRequestParameters());
+        }
+
         serviceFacade.deleteNode(id);
 
         // create the response entity

http://git-wip-us.apache.org/repos/asf/nifi/blob/7779af69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java
index 8dfe417..26a708c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java
@@ -168,7 +168,7 @@ public class CountersResource extends ApplicationResource {
                     nodeResponse = 
getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), 
getRequestParameters(), getHeaders()).awaitMergedResponse();
                 } else {
                     final Set<NodeIdentifier> coordinatorNode = 
Collections.singleton(getClusterCoordinatorNode());
-                    nodeResponse = 
getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(), getHeaders(), 
false).awaitMergedResponse();
+                    nodeResponse = 
getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(), getHeaders(), false, 
true).awaitMergedResponse();
                 }
 
                 final CountersEntity entity = (CountersEntity) 
nodeResponse.getUpdatedEntity();

http://git-wip-us.apache.org/repos/asf/nifi/blob/7779af69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 1ea3736..a4933a5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -1980,7 +1980,7 @@ public class ProcessGroupResource extends 
ApplicationResource {
                 return getRequestReplicator().replicate(HttpMethod.POST, 
importUri, entity, 
getHeaders(headersToOverride)).awaitMergedResponse().getResponse();
             } else {
                 final Set<NodeIdentifier> coordinatorNode = 
Collections.singleton(getClusterCoordinatorNode());
-                return getRequestReplicator().replicate(coordinatorNode, 
HttpMethod.POST, importUri, entity, getHeaders(headersToOverride), 
false).awaitMergedResponse().getResponse();
+                return getRequestReplicator().replicate(coordinatorNode, 
HttpMethod.POST, importUri, entity, getHeaders(headersToOverride), false, 
true).awaitMergedResponse().getResponse();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7779af69/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
index 8a093d0..d9db992 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
@@ -147,7 +147,7 @@ public class SystemDiagnosticsResource extends 
ApplicationResource {
                     nodeResponse = 
getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), 
getRequestParameters(), getHeaders()).awaitMergedResponse();
                 } else {
                     final Set<NodeIdentifier> coordinatorNode = 
Collections.singleton(getClusterCoordinatorNode());
-                    nodeResponse = 
getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(), getHeaders(), 
false).awaitMergedResponse();
+                    nodeResponse = 
getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(), getHeaders(), false, 
true).awaitMergedResponse();
                 }
 
                 final SystemDiagnosticsEntity entity = 
(SystemDiagnosticsEntity) nodeResponse.getUpdatedEntity();

Reply via email to