Repository: nifi
Updated Branches:
  refs/heads/master 916292994 -> 76a4a2c48


NIFI-2544: Created integration tests for clustering and addressed a few minor 
bugs that were found in doing so

This closes #832.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/76a4a2c4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/76a4a2c4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/76a4a2c4

Branch: refs/heads/master
Commit: 76a4a2c48b154719c3ea750b14d568f9998069ba
Parents: 9162929
Author: Mark Payne <[email protected]>
Authored: Tue Aug 9 16:50:04 2016 -0400
Committer: Bryan Bende <[email protected]>
Committed: Thu Aug 11 10:43:38 2016 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/util/NiFiProperties.java    |   6 +
 .../impl/NodeProtocolSenderListener.java        |   6 +-
 ...andardClusterCoordinationProtocolSender.java |   9 +-
 .../nifi-framework-cluster/pom.xml              |   5 +
 .../ClusterProtocolHeartbeatMonitor.java        |  11 +-
 .../node/CuratorNodeProtocolSender.java         |  16 +-
 .../node/NodeClusterCoordinator.java            |  54 +++-
 .../org/apache/nifi/cluster/ReportedEvent.java  |  45 +++
 .../heartbeat/TestAbstractHeartbeatMonitor.java |  25 +-
 .../node/TestNodeClusterCoordinator.java        |   6 +-
 .../nifi/cluster/integration/Cluster.java       | 114 +++++++
 .../integration/ClusterConnectionIT.java        | 238 ++++++++++++++
 .../nifi/cluster/integration/ClusterUtils.java  |  52 +++
 .../apache/nifi/cluster/integration/Node.java   | 320 +++++++++++++++++++
 .../cluster/integration/NopStateProvider.java   | 115 +++++++
 .../src/test/resources/conf/nifi.properties     |  21 +-
 .../test/resources/conf/state-management.xml    |  57 ++++
 .../src/test/resources/logback-test.xml         |   5 +
 .../apache/nifi/controller/FlowController.java  |   2 +-
 .../nifi/controller/StandardFlowService.java    |   6 +-
 .../election/CuratorLeaderElectionManager.java  |  24 +-
 21 files changed, 1077 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 90115d5..bbb3998 100644
--- 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -247,6 +247,12 @@ public class NiFiProperties extends Properties {
         super();
     }
 
+    public NiFiProperties copy() {
+        final NiFiProperties copy = new NiFiProperties();
+        copy.putAll(this);
+        return copy;
+    }
+
     /**
      * Factory method to create an instance of the {@link NiFiProperties}. This
      * method employs a standard singleton pattern by caching the instance if 
it

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
index cc40331..0fd2517 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
@@ -46,16 +46,18 @@ public class NodeProtocolSenderListener implements 
NodeProtocolSender, ProtocolL
     @Override
     public void stop() throws IOException {
         if (!isRunning()) {
-            throw new IllegalStateException("Instance is already stopped.");
+            return;
         }
+
         listener.stop();
     }
 
     @Override
     public void start() throws IOException {
         if (isRunning()) {
-            throw new IllegalStateException("Instance is already started.");
+            return;
         }
+
         listener.start();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
index e809961..b9ff082 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
@@ -67,9 +67,14 @@ public class StandardClusterCoordinationProtocolSender 
implements ClusterCoordin
 
     private final ProtocolContext<ProtocolMessage> protocolContext;
     private final SocketConfiguration socketConfiguration;
+    private final int maxThreadsPerRequest;
     private int handshakeTimeoutSeconds;
 
     public StandardClusterCoordinationProtocolSender(final SocketConfiguration 
socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
+        this(socketConfiguration, protocolContext, 
NiFiProperties.getInstance().getClusterNodeProtocolThreads());
+    }
+
+    public StandardClusterCoordinationProtocolSender(final SocketConfiguration 
socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext, 
final int maxThreadsPerRequest) {
         if (socketConfiguration == null) {
             throw new IllegalArgumentException("Socket configuration may not 
be null.");
         } else if (protocolContext == null) {
@@ -78,6 +83,7 @@ public class StandardClusterCoordinationProtocolSender 
implements ClusterCoordin
         this.socketConfiguration = socketConfiguration;
         this.protocolContext = protocolContext;
         this.handshakeTimeoutSeconds = -1;  // less than zero denotes variable 
not configured
+        this.maxThreadsPerRequest = maxThreadsPerRequest;
     }
 
     @Override
@@ -233,8 +239,7 @@ public class StandardClusterCoordinationProtocolSender 
implements ClusterCoordin
             return;
         }
 
-        final NiFiProperties properties = NiFiProperties.getInstance();
-        final int numThreads = Math.min(nodesToNotify.size(), 
properties.getClusterNodeProtocolThreads());
+        final int numThreads = Math.min(nodesToNotify.size(), 
maxThreadsPerRequest);
 
         final byte[] msgBytes;
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
index f74d1ae..79b8d6d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
@@ -142,6 +142,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
index 09dccad..d2d81d1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@ -113,8 +113,13 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
     @Override
     public void onStart() {
         final RetryPolicy retryPolicy = new RetryForever(5000);
-        curatorClient = 
CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(),
-            zkClientConfig.getSessionTimeoutMillis(), 
zkClientConfig.getConnectionTimeoutMillis(), retryPolicy);
+        curatorClient = CuratorFrameworkFactory.builder()
+            .connectString(zkClientConfig.getConnectString())
+            .sessionTimeoutMs(zkClientConfig.getSessionTimeoutMillis())
+            .connectionTimeoutMs(zkClientConfig.getConnectionTimeoutMillis())
+            .retryPolicy(retryPolicy)
+            .defaultData(new byte[0])
+            .build();
         curatorClient.start();
 
         // We don't know what the heartbeats look like for the nodes, since we 
were just elected to monitoring
@@ -149,7 +154,7 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
                             }
 
                             
curatorClient.create().withMode(CreateMode.EPHEMERAL).forPath(path, 
heartbeatAddress.getBytes(StandardCharsets.UTF_8));
-                            logger.info("Successfully created node in 
ZooKeeper with path {}", path);
+                            logger.info("Successfully published address as 
heartbeat monitor address at path {} with value {}", path, heartbeatAddress);
 
                             return;
                         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java
index e893c3a..daa3e5c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java
@@ -20,18 +20,19 @@ package org.apache.nifi.cluster.coordination.node;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
+import java.util.Properties;
 
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryNTimes;
+import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
 import org.apache.nifi.cluster.protocol.AbstractNodeProtocolSender;
 import org.apache.nifi.cluster.protocol.ProtocolContext;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
 import org.apache.nifi.io.socket.SocketConfiguration;
-import org.apache.nifi.util.NiFiProperties;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -49,7 +50,7 @@ public class CuratorNodeProtocolSender extends 
AbstractNodeProtocolSender {
     private InetSocketAddress coordinatorAddress;
 
 
-    public CuratorNodeProtocolSender(final SocketConfiguration socketConfig, 
final ProtocolContext<ProtocolMessage> protocolContext, final NiFiProperties 
properties) {
+    public CuratorNodeProtocolSender(final SocketConfiguration socketConfig, 
final ProtocolContext<ProtocolMessage> protocolContext, final Properties 
properties) {
         super(socketConfig, protocolContext);
         zkConfig = ZooKeeperClientConfig.createConfig(properties);
         coordinatorPath = zkConfig.resolvePath("cluster/nodes/coordinator");
@@ -75,9 +76,12 @@ public class CuratorNodeProtocolSender extends 
AbstractNodeProtocolSender {
                 }
             }).forPath(coordinatorPath);
 
+            if (coordinatorAddressBytes == null || 
coordinatorAddressBytes.length == 0) {
+                throw new NoClusterCoordinatorException("No node has yet been 
elected Cluster Coordinator. Cannot establish connection to cluster yet.");
+            }
+
             final String address = new String(coordinatorAddressBytes, 
StandardCharsets.UTF_8);
 
-            logger.info("Determined that Cluster Coordinator is located at {}; 
will use this address for sending heartbeat messages", address);
             final String[] splits = address.split(":");
             if (splits.length != 2) {
                 final String message = String.format("Attempted to determine 
Cluster Coordinator address. Zookeeper indicates "
@@ -86,6 +90,8 @@ public class CuratorNodeProtocolSender extends 
AbstractNodeProtocolSender {
                 throw new ProtocolException(message);
             }
 
+            logger.info("Determined that Cluster Coordinator is located at {}; 
will use this address for sending heartbeat messages", address);
+
             final String hostname = splits[0];
             final int port;
             try {
@@ -105,7 +111,9 @@ public class CuratorNodeProtocolSender extends 
AbstractNodeProtocolSender {
             return socketAddress;
         } catch (final NoNodeException nne) {
             logger.info("No node has yet been elected Cluster Coordinator. 
Cannot establish connection to cluster yet.");
-            throw new ProtocolException("No node has yet been elected Cluster 
Coordinator. Cannot establish connection to cluster yet.");
+            throw new NoClusterCoordinatorException("No node has yet been 
elected Cluster Coordinator. Cannot establish connection to cluster yet.");
+        } catch (final NoClusterCoordinatorException ncce) {
+            throw ncce;
         } catch (Exception e) {
             throw new IOException("Unable to determine Cluster Coordinator 
from ZooKeeper", e);
         } finally {

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 0c94a66..68c0a3a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -49,6 +49,8 @@ import org.apache.nifi.cluster.event.NodeEvent;
 import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
 import org.apache.nifi.cluster.manager.NodeResponse;
 import 
org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
+import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.protocol.ComponentRevision;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
@@ -71,6 +73,7 @@ import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.services.FlowService;
 import org.apache.nifi.web.revision.RevisionManager;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
@@ -98,6 +101,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
     private volatile FlowService flowService;
     private volatile boolean connected;
     private volatile String coordinatorAddress;
+    private volatile boolean closed = false;
 
     private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus> 
nodeStatuses = new ConcurrentHashMap<>();
     private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> 
nodeEvents = new ConcurrentHashMap<>();
@@ -123,14 +127,16 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         senderListener.addHandler(this);
     }
 
-    // method is synchronized because it modifies local node state and then 
broadcasts the change. We synchronize any time that this
-    // is done so that we don't have an issue where we create a 
NodeConnectionStatus, then another thread creates one and sends it
-    // before the first one is sent (as this results in the first status 
having a larger id, which means that the first status is never
-    // seen by other nodes).
     @Override
-    public synchronized void shutdown() {
+    public void shutdown() {
+        if (closed) {
+            return;
+        }
+
+        closed = true;
+
         final NodeConnectionStatus shutdownStatus = new 
NodeConnectionStatus(getLocalNodeIdentifier(), DisconnectionCode.NODE_SHUTDOWN);
-        updateNodeStatus(shutdownStatus);
+        updateNodeStatus(shutdownStatus, false);
         logger.info("Successfully notified other nodes that I am shutting 
down");
 
         curatorClient.close();
@@ -139,6 +145,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
     @Override
     public void setLocalNodeIdentifier(final NodeIdentifier nodeId) {
         this.nodeId = nodeId;
+        nodeStatuses.computeIfAbsent(nodeId, id -> new 
NodeConnectionStatus(id, DisconnectionCode.NOT_YET_CONNECTED));
     }
 
     @Override
@@ -159,6 +166,10 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         while (localNodeId == null) {
             localNodeId = fetchNodeId.get();
             if (localNodeId == null) {
+                if (closed) {
+                    return null;
+                }
+
                 try {
                     Thread.sleep(100L);
                 } catch (final InterruptedException ie) {
@@ -188,6 +199,8 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
             logger.info("Determined that Cluster Coordinator is located at 
{}", address);
             return address;
+        } catch (final KeeperException.NoNodeException nne) {
+            throw new NoClusterCoordinatorException();
         } catch (Exception e) {
             throw new IOException("Unable to determine Cluster Coordinator 
from ZooKeeper", e);
         }
@@ -395,7 +408,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         while (!updated) {
             final NodeConnectionStatus currentStatus = 
nodeStatuses.get(nodeId);
             if (currentStatus == null) {
-                throw new IllegalStateException("Cannot update roles for " + 
nodeId + " to " + roles + " because the node is not part of this cluster");
+                throw new UnknownNodeException("Cannot update roles for " + 
nodeId + " to " + roles + " because the node is not part of this cluster");
             }
 
             if (currentStatus.getRoles().equals(roles)) {
@@ -531,9 +544,15 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         final String electedNodeAddress;
         try {
             electedNodeAddress = getElectedActiveCoordinatorAddress();
+        } catch (final NoClusterCoordinatorException ncce) {
+            logger.debug("There is currently no elected active Cluster 
Coordinator");
+            return null;
         } catch (final IOException ioe) {
             if (warnOnError) {
-                logger.warn("Failed to determine which node is elected active 
Cluster Coordinator. There may be no coordinator currently:", ioe);
+                logger.warn("Failed to determine which node is elected active 
Cluster Coordinator. There may be no coordinator currently: " + ioe);
+                if (logger.isDebugEnabled()) {
+                    logger.warn("", ioe);
+                }
             }
 
             return null;
@@ -644,6 +663,10 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
      */
     // visible for testing.
     void updateNodeStatus(final NodeConnectionStatus status) {
+        updateNodeStatus(status, true);
+    }
+
+    void updateNodeStatus(final NodeConnectionStatus status, final boolean 
waitForCoordinator) {
         final NodeIdentifier nodeId = status.getNodeIdentifier();
 
         // In this case, we are using nodeStatuses.put() instead of getting 
the current value and
@@ -668,14 +691,14 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
                 logger.debug("Notifying cluster coordinator that node status 
changed from {} to {}", currentStatus, status);
             }
 
-            notifyOthersOfNodeStatusChange(status, notifyAllNodes);
+            notifyOthersOfNodeStatusChange(status, notifyAllNodes, 
waitForCoordinator);
         } else {
             logger.debug("Not notifying other nodes that status changed 
because previous state of {} is same as new state of {}", currentState, 
status.getState());
         }
     }
 
     void notifyOthersOfNodeStatusChange(final NodeConnectionStatus 
updatedStatus) {
-        notifyOthersOfNodeStatusChange(updatedStatus, 
isActiveClusterCoordinator());
+        notifyOthersOfNodeStatusChange(updatedStatus, 
isActiveClusterCoordinator(), true);
     }
 
     /**
@@ -684,7 +707,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
      * @param updatedStatus the updated status for a node in the cluster
      * @param notifyAllNodes if <code>true</code> will notify all nodes. If 
<code>false</code>, will notify only the cluster coordinator
      */
-    void notifyOthersOfNodeStatusChange(final NodeConnectionStatus 
updatedStatus, final boolean notifyAllNodes) {
+    void notifyOthersOfNodeStatusChange(final NodeConnectionStatus 
updatedStatus, final boolean notifyAllNodes, final boolean waitForCoordinator) {
         // If this node is the active cluster coordinator, then we are going 
to replicate to all nodes.
         // Otherwise, get the active coordinator (or wait for one to become 
active) and then notify the coordinator.
         final Set<NodeIdentifier> nodesToNotify;
@@ -693,8 +716,14 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
             // Do not notify ourselves because we already know about the 
status update.
             nodesToNotify.remove(getLocalNodeIdentifier());
-        } else {
+        } else if (waitForCoordinator) {
             nodesToNotify = 
Collections.singleton(waitForElectedClusterCoordinator());
+        } else {
+            final NodeIdentifier nodeId = getElectedActiveCoordinatorNode();
+            if (nodeId == null) {
+                return;
+            }
+            nodesToNotify = Collections.singleton(nodeId);
         }
 
         final NodeStatusChangeMessage message = new NodeStatusChangeMessage();
@@ -878,6 +907,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
             if (updated) {
                 logger.info("Status of {} changed from {} to {}", 
statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
+                logger.debug("State of cluster nodes is now {}", nodeStatuses);
 
                 final NodeConnectionStatus status = 
statusChangeMessage.getNodeConnectionStatus();
                 final String summary = summarizeStatusChange(oldStatus, 
status);

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/ReportedEvent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/ReportedEvent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/ReportedEvent.java
new file mode 100644
index 0000000..8fc808d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/ReportedEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.reporting.Severity;
+
+public class ReportedEvent {
+    private final NodeIdentifier nodeId;
+    private final Severity severity;
+    private final String event;
+
+    public ReportedEvent(NodeIdentifier nodeId, Severity severity, String 
event) {
+        this.nodeId = nodeId;
+        this.severity = severity;
+        this.event = event;
+    }
+
+    public NodeIdentifier getNodeId() {
+        return nodeId;
+    }
+
+    public Severity getSeverity() {
+        return severity;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 0f1ce20..5086dc0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import org.apache.nifi.cluster.ReportedEvent;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
@@ -322,30 +323,6 @@ public class TestAbstractHeartbeatMonitor {
         }
     }
 
-    public static class ReportedEvent {
-        private final NodeIdentifier nodeId;
-        private final Severity severity;
-        private final String event;
-
-        public ReportedEvent(NodeIdentifier nodeId, Severity severity, String 
event) {
-            this.nodeId = nodeId;
-            this.severity = severity;
-            this.event = event;
-        }
-
-        public NodeIdentifier getNodeId() {
-            return nodeId;
-        }
-
-        public Severity getSeverity() {
-            return severity;
-        }
-
-        public String getEvent() {
-            return event;
-        }
-    }
-
 
     private static class TestFriendlyHeartbeatMonitor extends 
AbstractHeartbeatMonitor {
         private Map<NodeIdentifier, NodeHeartbeat> heartbeats = new 
HashMap<>();

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index 319bd84..2f034b3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -77,7 +77,7 @@ public class TestNodeClusterCoordinator {
 
         coordinator = new NodeClusterCoordinator(senderListener, 
eventReporter, null, revisionManager, createProperties()) {
             @Override
-            void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes) {
+            void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
                 nodeStatuses.add(updatedStatus);
             }
         };
@@ -132,7 +132,7 @@ public class TestNodeClusterCoordinator {
 
         final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, 
createProperties()) {
             @Override
-            void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes) {
+            void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
             }
         };
 
@@ -170,7 +170,7 @@ public class TestNodeClusterCoordinator {
 
         final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, 
createProperties()) {
             @Override
-            void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes) {
+            void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
             }
         };
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
new file mode 100644
index 0000000..dbd8c00
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.integration;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.nifi.cluster.coordination.node.ClusterRoles;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Cluster {
+    private static final Logger logger = 
LoggerFactory.getLogger(Cluster.class);
+
+    private final Set<Node> nodes = new HashSet<>();
+    private final TestingServer zookeeperServer;
+
+    public Cluster() {
+        try {
+            zookeeperServer = new TestingServer();
+        } catch (final Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+
+    public void start() {
+        try {
+            zookeeperServer.start();
+        } catch (final RuntimeException e) {
+            throw e;
+        } catch (final Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        while (getZooKeeperConnectString() == null) {
+            try {
+                Thread.sleep(100L);
+            } catch (InterruptedException e) {
+            }
+        }
+
+        logger.info("Start ZooKeeper Server on Port {}, with temporary 
directory {}", zookeeperServer.getPort(), zookeeperServer.getTempDirectory());
+    }
+
+    public void stop() {
+        for (final Node node : nodes) {
+            try {
+                if (node.isRunning()) {
+                    node.stop();
+                }
+            } catch (Exception e) {
+                logger.error("Failed to shut down " + node, e);
+            }
+        }
+
+        try {
+            zookeeperServer.stop();
+            zookeeperServer.close();
+        } catch (final Exception e) {
+        }
+    }
+
+
+    public String getZooKeeperConnectString() {
+        return zookeeperServer.getConnectString();
+    }
+
+    public Set<Node> getNodes() {
+        return Collections.unmodifiableSet(nodes);
+    }
+
+
+    public Node createNode() {
+        
NiFiProperties.getInstance().setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING,
 getZooKeeperConnectString());
+        
NiFiProperties.getInstance().setProperty(NiFiProperties.CLUSTER_IS_NODE, 
"true");
+
+        final NiFiProperties properties = NiFiProperties.getInstance().copy();
+        final Node node = new Node(properties);
+        node.start();
+        nodes.add(node);
+
+        return node;
+    }
+
+    public Node waitForClusterCoordinator(final long time, final TimeUnit 
timeUnit) {
+        return ClusterUtils.waitUntilNonNull(time, timeUnit,
+            () -> getNodes().stream().filter(node -> 
node.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR)).findFirst().orElse(null));
+    }
+
+    public Node waitForPrimaryNode(final long time, final TimeUnit timeUnit) {
+        return ClusterUtils.waitUntilNonNull(time, timeUnit,
+            () -> getNodes().stream().filter(node -> 
node.getRoles().contains(ClusterRoles.PRIMARY_NODE)).findFirst().orElse(null));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
new file mode 100644
index 0000000..6881ca2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.integration;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import org.apache.nifi.cluster.coordination.node.ClusterRoles;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ClusterConnectionIT {
+
+    @BeforeClass
+    public static void setup() {
+        System.setProperty("nifi.properties.file.path", 
"src/test/resources/conf/nifi.properties");
+    }
+
+    @Test(timeout = 20000)
+    public void testSingleNode() throws InterruptedException {
+        final Cluster cluster = new Cluster();
+        cluster.start();
+
+        try {
+            final Node firstNode = cluster.createNode();
+            firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
+
+            
firstNode.waitUntilElectedForRole(ClusterRoles.CLUSTER_COORDINATOR, 10, 
TimeUnit.SECONDS);
+            firstNode.waitUntilElectedForRole(ClusterRoles.PRIMARY_NODE, 10, 
TimeUnit.SECONDS);
+        } finally {
+            cluster.stop();
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testThreeNodeCluster() throws InterruptedException {
+        final Cluster cluster = new Cluster();
+        cluster.start();
+
+        try {
+            final Node firstNode = cluster.createNode();
+            final Node secondNode = cluster.createNode();
+            final Node thirdNode = cluster.createNode();
+
+            firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
+            System.out.println("**** Node 1 Connected ****");
+            secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
+            System.out.println("**** Node 2 Connected ****");
+            thirdNode.waitUntilConnected(10, TimeUnit.SECONDS);
+            System.out.println("**** Node 3 Connected ****");
+
+            final Node clusterCoordinator = 
cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
+            final Node primaryNode = cluster.waitForPrimaryNode(10, 
TimeUnit.SECONDS);
+            System.out.println("\n\n");
+            System.out.println("Cluster Coordinator = " + clusterCoordinator);
+            System.out.println("Primary Node = " + primaryNode);
+            System.out.println("\n\n");
+        } finally {
+            cluster.stop();
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testNewCoordinatorElected() throws IOException {
+        final Cluster cluster = new Cluster();
+        cluster.start();
+
+        try {
+            final Node firstNode = cluster.createNode();
+            final Node secondNode = cluster.createNode();
+
+            firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
+            System.out.println("**** Node 1 Connected ****");
+            secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
+            System.out.println("**** Node 2 Connected ****");
+
+            final Node clusterCoordinator = 
cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
+            clusterCoordinator.stop();
+
+            final Node otherNode = firstNode == clusterCoordinator ? 
secondNode : firstNode;
+            
otherNode.waitUntilElectedForRole(ClusterRoles.CLUSTER_COORDINATOR, 10, 
TimeUnit.SECONDS);
+        } finally {
+            cluster.stop();
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testReconnectGetsCorrectClusterTopology() throws IOException {
+        final Cluster cluster = new Cluster();
+        cluster.start();
+
+        try {
+            final Node firstNode = cluster.createNode();
+            final Node secondNode = cluster.createNode();
+            final Node thirdNode = cluster.createNode();
+
+            firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
+            System.out.println("**** Node 1 Connected ****");
+            secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
+            System.out.println("**** Node 2 Connected ****");
+            thirdNode.waitUntilConnected(10, TimeUnit.SECONDS);
+            System.out.println("**** Node 3 Connected ****");
+
+            // shutdown node
+            secondNode.stop();
+
+            System.out.println("\n\nNode 2 Shut Down\n\n");
+
+            // wait for node 1 and 3 to recognize that node 2 is gone
+            Stream.of(firstNode, thirdNode).forEach(node -> {
+                node.assertNodeDisconnects(secondNode.getIdentifier(), 5, 
TimeUnit.SECONDS);
+            });
+
+            // restart node
+            secondNode.start();
+            System.out.println("\n\nNode 2 Restarted\n\n");
+
+            secondNode.waitUntilConnected(20, TimeUnit.SECONDS);
+            System.out.println("\n\nNode 2 Reconnected\n\n");
+
+            // wait for all 3 nodes to agree that node 2 is connected
+            Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
+                ClusterUtils.waitUntilConditionMet(5, TimeUnit.SECONDS,
+                    () -> 
firstNode.getClusterCoordinator().getConnectionStatus(secondNode.getIdentifier()).getState()
 == NodeConnectionState.CONNECTED);
+            });
+
+            // Ensure that all 3 nodes see a cluster of 3 connected nodes.
+            Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
+                node.assertNodeIsConnected(firstNode.getIdentifier());
+                node.assertNodeIsConnected(secondNode.getIdentifier());
+                node.assertNodeIsConnected(thirdNode.getIdentifier());
+            });
+
+            // Ensure that we get both a cluster coordinator and a primary 
node elected
+            cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
+            cluster.waitForPrimaryNode(10, TimeUnit.SECONDS);
+        } finally {
+            cluster.stop();
+        }
+    }
+
+
+    @Test(timeout = 60000)
+    public void testRestartAllNodes() throws IOException {
+        final Cluster cluster = new Cluster();
+        cluster.start();
+
+        try {
+            final Node firstNode = cluster.createNode();
+            final Node secondNode = cluster.createNode();
+            final Node thirdNode = cluster.createNode();
+
+            firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
+            System.out.println("**** Node 1 Connected ****");
+            secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
+            System.out.println("**** Node 2 Connected ****");
+            thirdNode.waitUntilConnected(10, TimeUnit.SECONDS);
+            System.out.println("**** Node 3 Connected ****");
+
+            // shutdown node
+            firstNode.stop();
+            secondNode.stop();
+            thirdNode.stop();
+
+            System.out.println("\n\nRestarting all nodes\n\n");
+            thirdNode.start();
+            firstNode.start();
+            secondNode.start();
+
+            Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
+                node.waitUntilConnected(10, TimeUnit.SECONDS);
+            });
+
+            // wait for all 3 nodes to agree that node 2 is connected
+            Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
+                ClusterUtils.waitUntilConditionMet(5, TimeUnit.SECONDS,
+                    () -> 
firstNode.getClusterCoordinator().getConnectionStatus(secondNode.getIdentifier()).getState()
 == NodeConnectionState.CONNECTED);
+            });
+
+            // Ensure that all 3 nodes see a cluster of 3 connected nodes.
+            Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
+                node.assertNodeConnects(firstNode.getIdentifier(), 10, 
TimeUnit.SECONDS);
+                node.assertNodeConnects(secondNode.getIdentifier(), 10, 
TimeUnit.SECONDS);
+                node.assertNodeConnects(thirdNode.getIdentifier(), 10, 
TimeUnit.SECONDS);
+            });
+
+            // Ensure that we get both a cluster coordinator and a primary 
node elected
+            cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
+            cluster.waitForPrimaryNode(10, TimeUnit.SECONDS);
+        } finally {
+            cluster.stop();
+        }
+    }
+
+
+    @Test(timeout = 30000)
+    public void testHeartbeatsMonitored() throws IOException {
+        final Cluster cluster = new Cluster();
+        cluster.start();
+
+        try {
+            final Node firstNode = cluster.createNode();
+            final Node secondNode = cluster.createNode();
+
+            firstNode.waitUntilConnected(10, TimeUnit.SECONDS);
+            secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
+
+            secondNode.suspendHeartbeating();
+
+            // Heartbeat interval in nifi.properties is set to 1 sec. This 
means that the node should be kicked out
+            // due to lack of heartbeat after 8 times this amount of time, or 
8 seconds.
+            firstNode.assertNodeDisconnects(secondNode.getIdentifier(), 12, 
TimeUnit.SECONDS);
+
+            secondNode.resumeHeartbeating();
+            firstNode.assertNodeConnects(secondNode.getIdentifier(), 10, 
TimeUnit.SECONDS);
+        } finally {
+            cluster.stop();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterUtils.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterUtils.java
new file mode 100644
index 0000000..972d2c7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.integration;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
+import java.util.function.Supplier;
+
+public class ClusterUtils {
+
+    public static void waitUntilConditionMet(final long time, final TimeUnit 
timeUnit, final BooleanSupplier test) {
+        final long nanosToWait = timeUnit.toNanos(time);
+        final long start = System.nanoTime();
+        final long maxTime = start + nanosToWait;
+
+        while (!test.getAsBoolean()) {
+            if (System.nanoTime() > maxTime) {
+                throw new AssertionError("Condition never occurred after 
waiting " + time + " " + timeUnit);
+            }
+        }
+    }
+
+    public static <T> T waitUntilNonNull(final long time, final TimeUnit 
timeUnit, final Supplier<T> test) {
+        final long nanosToWait = timeUnit.toNanos(time);
+        final long start = System.nanoTime();
+        final long maxTime = start + nanosToWait;
+
+        T returnVal;
+        while ((returnVal = test.get()) == null) {
+            if (System.nanoTime() > maxTime) {
+                throw new AssertionError("Condition never occurred after 
waiting " + time + " " + timeUnit);
+            }
+        }
+
+        return returnVal;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
new file mode 100644
index 0000000..5bfe83c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.integration;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.cluster.ReportedEvent;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import 
org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor;
+import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
+import org.apache.nifi.cluster.coordination.node.CuratorNodeProtocolSender;
+import org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolListener;
+import 
org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
+import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener;
+import org.apache.nifi.cluster.protocol.impl.SocketProtocolListener;
+import 
org.apache.nifi.cluster.protocol.impl.StandardClusterCoordinationProtocolSender;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.StandardFlowService;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.io.socket.ServerSocketConfiguration;
+import org.apache.nifi.io.socket.SocketConfiguration;
+import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.revision.RevisionManager;
+import org.junit.Assert;
+import org.mockito.Mockito;
+
+public class Node {
+    private final NodeIdentifier nodeId;
+    private final NiFiProperties nodeProperties;
+
+    private final List<ReportedEvent> reportedEvents = 
Collections.synchronizedList(new ArrayList<ReportedEvent>());
+    private final RevisionManager revisionManager;
+
+    private NodeClusterCoordinator clusterCoordinator;
+    private CuratorNodeProtocolSender protocolSender;
+    private FlowController flowController;
+    private StandardFlowService flowService;
+
+    private ProtocolListener protocolListener;
+
+    private volatile boolean running = false;
+
+    private ScheduledExecutorService executor = new FlowEngine(8, "Node 
tasks", true);
+
+    public Node(final NiFiProperties properties) {
+        this(new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 
createPort(), "localhost", createPort(), "localhost", null, null, false, null), 
properties);
+    }
+
+    public Node(final NodeIdentifier nodeId, final NiFiProperties properties) {
+        this.nodeId = nodeId;
+        this.nodeProperties = properties;
+
+        nodeProperties.setProperty(NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT, 
String.valueOf(nodeId.getSocketPort()));
+        nodeProperties.setProperty(NiFiProperties.WEB_HTTP_PORT, 
String.valueOf(nodeId.getApiPort()));
+
+        revisionManager = Mockito.mock(RevisionManager.class);
+        
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.<Revision>
 emptyList());
+    }
+
+
+    public synchronized void start() {
+        running = true;
+
+        protocolSender = createNodeProtocolSender();
+        clusterCoordinator = createClusterCoordinator();
+        clusterCoordinator.setLocalNodeIdentifier(nodeId);
+        clusterCoordinator.setConnected(true);
+
+        final HeartbeatMonitor heartbeatMonitor = createHeartbeatMonitor();
+        flowController = 
FlowController.createClusteredInstance(Mockito.mock(FlowFileEventRepository.class),
 nodeProperties,
+            null, null, StringEncryptor.createEncryptor(), protocolSender, 
Mockito.mock(BulletinRepository.class), clusterCoordinator, heartbeatMonitor, 
VariableRegistry.EMPTY_REGISTRY);
+
+        try {
+            flowController.initializeFlow();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        final NodeProtocolSenderListener senderListener = new 
NodeProtocolSenderListener(protocolSender, protocolListener);
+        try {
+            flowController.getStateManagerProvider().getStateManager("Cluster 
Node Configuration").setState(Collections.singletonMap("Node UUID", 
nodeId.getId()), Scope.LOCAL);
+
+            flowService = 
StandardFlowService.createClusteredInstance(flowController, nodeProperties, 
senderListener, clusterCoordinator,
+                StringEncryptor.createEncryptor(), revisionManager, 
Mockito.mock(Authorizer.class));
+
+            flowService.start();
+            flowService.load(null);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void stop() throws IOException {
+        running = false;
+
+        flowController.shutdown(true);
+        flowService.stop(true);
+
+        clusterCoordinator.shutdown();
+        executor.shutdownNow();
+
+        // protocol listener is closed by flow controller
+    }
+
+    public void suspendHeartbeating() {
+        flowController.suspendHeartbeats();
+    }
+
+    public void resumeHeartbeating() {
+        flowController.resumeHeartbeats();
+    }
+
+    public NodeIdentifier getIdentifier() {
+        return nodeId;
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder().append(nodeId).build();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (obj == this) {
+            return true;
+        }
+        if (!(obj instanceof Node)) {
+            return false;
+        }
+
+        return getIdentifier().equals(((Node) obj).getIdentifier());
+    }
+
+    @Override
+    public String toString() {
+        return "Node[id=" + getIdentifier() + ", started=" + isRunning() + "]";
+    }
+
+    public boolean isRunning() {
+        return running;
+    }
+
+    private static int createPort() {
+        // get an unused port
+        while (true) {
+            try (ServerSocket ss = new ServerSocket(0)) {
+                return ss.getLocalPort();
+            } catch (final IOException ioe) {
+            }
+        }
+    }
+
+    public Set<String> getRoles() {
+        final NodeConnectionStatus status = getConnectionStatus();
+        return status == null ? Collections.emptySet() : status.getRoles();
+    }
+
+    public NodeConnectionStatus getConnectionStatus() {
+        return clusterCoordinator.getConnectionStatus(nodeId);
+    }
+
+    @SuppressWarnings("unchecked")
+    private CuratorNodeProtocolSender createNodeProtocolSender() {
+        final SocketConfiguration socketConfig = new SocketConfiguration();
+        socketConfig.setSocketTimeout(3000);
+        socketConfig.setReuseAddress(true);
+
+        final ProtocolContext<ProtocolMessage> protocolContext = new 
JaxbProtocolContext<>(JaxbProtocolUtils.JAXB_CONTEXT);
+        final CuratorNodeProtocolSender protocolSender = new 
CuratorNodeProtocolSender(socketConfig, protocolContext, nodeProperties);
+        return protocolSender;
+    }
+
+    @SuppressWarnings("unchecked")
+    private ClusterCoordinationProtocolSender 
createCoordinatorProtocolSender() {
+        final SocketConfiguration socketConfig = new SocketConfiguration();
+        socketConfig.setSocketTimeout(3000);
+        socketConfig.setReuseAddress(true);
+
+        final ProtocolContext<ProtocolMessage> protocolContext = new 
JaxbProtocolContext<>(JaxbProtocolUtils.JAXB_CONTEXT);
+        return new StandardClusterCoordinationProtocolSender(socketConfig, 
protocolContext, 1);
+    }
+
+    private HeartbeatMonitor createHeartbeatMonitor() {
+        return new ClusterProtocolHeartbeatMonitor(clusterCoordinator, 
protocolListener, nodeProperties);
+    }
+
+    @SuppressWarnings("unchecked")
+    private NodeClusterCoordinator createClusterCoordinator() {
+        final EventReporter eventReporter = new EventReporter() {
+            @Override
+            public void reportEvent(Severity severity, String category, String 
message) {
+                reportedEvents.add(new ReportedEvent(nodeId, severity, 
message));
+            }
+        };
+
+        final ServerSocketConfiguration serverSocketConfiguration = new 
ServerSocketConfiguration();
+        serverSocketConfiguration.setSocketTimeout(5000);
+        final ProtocolContext<ProtocolMessage> protocolContext = new 
JaxbProtocolContext<>(JaxbProtocolUtils.JAXB_CONTEXT);
+
+        protocolListener = new SocketProtocolListener(3, 
Integer.parseInt(nodeProperties.getProperty(NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT)),
 serverSocketConfiguration, protocolContext);
+        try {
+            protocolListener.start();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        final ClusterCoordinationProtocolSenderListener protocolSenderListener 
= new 
ClusterCoordinationProtocolSenderListener(createCoordinatorProtocolSender(), 
protocolListener);
+        return new NodeClusterCoordinator(protocolSenderListener, 
eventReporter, null, revisionManager, nodeProperties);
+    }
+
+
+    public ClusterCoordinator getClusterCoordinator() {
+        return clusterCoordinator;
+    }
+
+
+    //
+    // Methods for checking conditions
+    //
+    public boolean isConnected() {
+        final NodeConnectionStatus status = getConnectionStatus();
+        if (status == null) {
+            return false;
+        }
+
+        return status.getState() == NodeConnectionState.CONNECTED;
+    }
+
+    //
+    // Methods to wait for conditions
+    //
+    public void waitUntilConnected(final long time, final TimeUnit timeUnit) {
+        ClusterUtils.waitUntilConditionMet(time, timeUnit, () -> 
isConnected());
+    }
+
+    public void waitUntilElectedForRole(final String roleName, final long 
time, final TimeUnit timeUnit) {
+        ClusterUtils.waitUntilConditionMet(time, timeUnit, () -> 
getRoles().contains(roleName));
+    }
+
+    // Assertions
+    /**
+     * Assert that the node with the given ID connects (According to this 
node!) within the given amount of time
+     *
+     * @param nodeId id of the node
+     * @param time how long to wait
+     * @param timeUnit unit of time provided by the 'time' argument
+     */
+    public void assertNodeConnects(final NodeIdentifier nodeId, final long 
time, final TimeUnit timeUnit) {
+        ClusterUtils.waitUntilConditionMet(time, timeUnit,
+            () -> 
getClusterCoordinator().getConnectionStatus(nodeId).getState() == 
NodeConnectionState.CONNECTED);
+    }
+
+
+    /**
+     * Assert that the node with the given ID disconnects (According to this 
node!) within the given amount of time
+     *
+     * @param nodeId id of the node
+     * @param time how long to wait
+     * @param timeUnit unit of time provided by the 'time' argument
+     */
+    public void assertNodeDisconnects(final NodeIdentifier nodeId, final long 
time, final TimeUnit timeUnit) {
+        ClusterUtils.waitUntilConditionMet(time, timeUnit,
+            () -> 
getClusterCoordinator().getConnectionStatus(nodeId).getState() == 
NodeConnectionState.DISCONNECTED);
+    }
+
+
+    /**
+     * Asserts that the node with the given ID is currently connected 
(According to this node!)
+     *
+     * @param nodeId id of the node
+     */
+    public void assertNodeIsConnected(final NodeIdentifier nodeId) {
+        Assert.assertEquals(NodeConnectionState.CONNECTED, 
getClusterCoordinator().getConnectionStatus(nodeId).getState());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/NopStateProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/NopStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/NopStateProvider.java
new file mode 100644
index 0000000..fb80ab2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/NopStateProvider.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.integration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.components.state.StateProvider;
+import org.apache.nifi.components.state.StateProviderInitializationContext;
+import org.apache.nifi.controller.state.StandardStateMap;
+
+public class NopStateProvider implements StateProvider {
+    private final String id = UUID.randomUUID().toString();
+    private final Map<String, Map<String, String>> componentStateMap = new 
HashMap<>();
+
+    @Override
+    public Collection<ValidationResult> validate(ValidationContext context) {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public PropertyDescriptor getPropertyDescriptor(String name) {
+        return null;
+    }
+
+    @Override
+    public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
+    }
+
+    @Override
+    public List<PropertyDescriptor> getPropertyDescriptors() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public String getIdentifier() {
+        return id;
+    }
+
+    @Override
+    public void initialize(StateProviderInitializationContext context) throws 
IOException {
+    }
+
+    @Override
+    public void shutdown() {
+    }
+
+    @Override
+    public synchronized void setState(Map<String, String> state, String 
componentId) throws IOException {
+        final Map<String, String> stateMap = 
componentStateMap.computeIfAbsent(componentId, compId -> new HashMap<String, 
String>());
+        stateMap.clear();
+        stateMap.putAll(state);
+    }
+
+    @Override
+    public synchronized StateMap getState(String componentId) throws 
IOException {
+        return new 
StandardStateMap(componentStateMap.computeIfAbsent(componentId, compId -> new 
HashMap<String, String>()), 0L);
+    }
+
+    @Override
+    public synchronized boolean replace(StateMap oldValue, Map<String, String> 
newValue, String componentId) throws IOException {
+        return false;
+    }
+
+    @Override
+    public void clear(String componentId) throws IOException {
+    }
+
+    @Override
+    public void onComponentRemoved(String componentId) throws IOException {
+    }
+
+    @Override
+    public void enable() {
+    }
+
+    @Override
+    public void disable() {
+    }
+
+    @Override
+    public boolean isEnabled() {
+        return true;
+    }
+
+    @Override
+    public Scope[] getSupportedScopes() {
+        return new Scope[] {Scope.LOCAL};
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties
index 78a649b..44b2a4e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties
@@ -30,11 +30,25 @@ nifi.ui.autorefresh.interval=30 sec
 nifi.nar.library.directory=./target/lib
 nifi.nar.working.directory=./target/work/nar/
 
+####################
+# State Management #
+####################
+nifi.state.management.configuration.file=src/test/resources/conf/state-management.xml
+# The ID of the local state provider
+nifi.state.management.provider.local=local-provider
+# The ID of the cluster-wide state provider. This will be ignored if NiFi is 
not clustered but must be populated if running in a cluster.
+nifi.state.management.provider.cluster=zk-provider
+# Specifies whether or not this instance of NiFi should run an embedded 
ZooKeeper server
+nifi.state.management.embedded.zookeeper.start=false
+# Properties file that provides the ZooKeeper properties to use if 
<nifi.state.management.embedded.zookeeper.start> is set to true
+nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties
+
 # H2 Settings
 nifi.database.directory=./database_repository
 nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
 
 # FlowFile Repository
+nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.VolatileFlowFileRepository
 nifi.flowfile.repository.directory=./target/test-repo
 nifi.flowfile.repository.partitions=1
 nifi.flowfile.repository.checkpoint.interval=2 mins
@@ -56,10 +70,11 @@ nifi.provenance.repository.max.storage.time=24 hours
 nifi.provenance.repository.max.storage.size=1 GB
 nifi.provenance.repository.rollover.time=30 secs
 nifi.provenance.repository.rollover.size=100 MB
+nifi.provenance.repository.implementation=org.apache.nifi.provenance.MockProvenanceRepository
 
 # Site to Site properties
-nifi.remote.input.socket.port=9990
-nifi.remote.input.secure=true
+nifi.remote.input.socket.port=
+nifi.remote.input.secure=false
 
 # web properties #
 nifi.web.war.directory=./target/lib
@@ -89,7 +104,7 @@ nifi.security.support.new.account.requests=
 nifi.security.default.user.roles=
 
 # cluster common properties (cluster manager and nodes must have same values) #
-nifi.cluster.protocol.heartbeat.interval=5 sec
+nifi.cluster.protocol.heartbeat.interval=1 secs
 nifi.cluster.protocol.is.secure=false
 nifi.cluster.protocol.socket.timeout=30 sec
 nifi.cluster.protocol.connection.handshake.timeout=45 sec

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/state-management.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/state-management.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/state-management.xml
new file mode 100644
index 0000000..6e95b15
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/state-management.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+        http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<!--
+  This file provides a mechanism for defining and configuring the State 
Providers
+  that should be used for storing state locally and across a NiFi cluster. In 
order
+  to use a specific provider, it must be configured here and its identifier
+  must be specified in the nifi.properties file.
+-->
+<stateManagement>
+    <!--
+        State Provider that stores state locally in a configurable directory. 
This Provider requires the following properties:
+
+        Directory - the directory to store components' state in. If the 
directory being used is a sub-directory of the NiFi installation, it
+                    is important that the directory be copied over to the new 
version when upgrading NiFi.
+     -->
+    <local-provider>
+        <id>local-provider</id>
+        <class>org.apache.nifi.cluster.integration.NopStateProvider</class>
+    </local-provider>
+
+    <!--
+        State Provider that is used to store state in ZooKeeper. This Provider 
requires the following properties:
+
+        Root Node - the root node in ZooKeeper where state should be stored. 
The default is '/nifi', but it is advisable to change this to a different value 
if not using
+                   the embedded ZooKeeper server and if multiple NiFi 
instances may all be using the same ZooKeeper Server.
+
+        Connect String - A comma-separated list of host:port pairs to connect 
to ZooKeeper. For example, myhost.mydomain:2181,host2.mydomain:5555,host3:6666
+
+        Session Timeout - Specifies how long this instance of NiFi is allowed 
to be disconnected from ZooKeeper before creating a new ZooKeeper Session. 
Default value is "30 seconds"
+
+        Access Control - Specifies which Access Controls will be applied to 
the ZooKeeper ZNodes that are created by this State Provider. This value must 
be set to one of:
+                            - Open  : ZNodes will be open to any ZooKeeper 
client.
+                            - CreatorOnly  : ZNodes will be accessible only by 
the creator. The creator will have full access to create children, read, write, 
delete, and administer the ZNodes.
+                                             This option is available only if 
access to ZooKeeper is secured via Kerberos or if a Username and Password are 
set.
+    -->
+    <cluster-provider>
+        <id>zk-provider</id>
+        
<class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
+        <property name="Connect String">localhost:8320</property>
+        <property name="Root Node">/nifi</property>
+        <property name="Session Timeout">10 seconds</property>
+        <property name="Access Control">Open</property>
+    </cluster-provider>
+</stateManagement>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/logback-test.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/logback-test.xml
index 92eb78c..0f5adc8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/logback-test.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/logback-test.xml
@@ -23,6 +23,11 @@
     
     <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR -->
     <logger name="org.apache.nifi" level="INFO"/>
+    <logger name="org.apache.nifi.engine.FlowEngine" level="OFF" />
+    <logger name="org.apache.nifi.cluster.coordination.node" level="DEBUG" />
+    <logger name="org.apache.curator.framework.recipes.leader.LeaderSelector" 
level="OFF" />
+    <logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" />
+    <logger name="org.apache.curator.framework.imps.CuratorFrameworkImpl" 
level="OFF" />
     
     <!-- Logger for managing logging statements for nifi clusters. -->
     <logger name="org.apache.nifi.cluster" level="INFO"/>

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index fdb6f58..2635fc4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -581,7 +581,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
 
         if (configuredForClustering) {
-            leaderElectionManager = new CuratorLeaderElectionManager(4);
+            leaderElectionManager = new CuratorLeaderElectionManager(4, 
properties);
             heartbeater = new ClusterProtocolHeartbeater(protocolSender, 
properties);
 
             // Check if there is already a cluster coordinator elected. If 
not, go ahead

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index b634e74..091e59c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -788,7 +788,11 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
                     }
                 } catch (final Exception pe) {
                     // could not create a socket and communicate with manager
-                    logger.warn("Failed to connect to cluster due to: " + pe, 
pe);
+                    logger.warn("Failed to connect to cluster due to: " + pe);
+                    if (logger.isDebugEnabled()) {
+                        logger.warn("", pe);
+                    }
+
                     if (retryOnCommsFailure) {
                         try {
                             Thread.sleep(response == null ? 5000 : 
response.getTryLaterSeconds());

http://git-wip-us.apache.org/repos/asf/nifi/blob/76a4a2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
index 9d076c0..7bf7494 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.leader.election;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
@@ -48,10 +49,13 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
     private final Map<String, LeaderRole> leaderRoles = new HashMap<>();
     private final Map<String, LeaderElectionStateChangeListener> 
registeredRoles = new HashMap<>();
 
+
     public CuratorLeaderElectionManager(final int threadPoolSize) {
-        leaderElectionMonitorEngine = new FlowEngine(threadPoolSize, "Leader 
Election Notification", true);
+        this(threadPoolSize, NiFiProperties.getInstance());
+    }
 
-        final NiFiProperties properties = NiFiProperties.getInstance();
+    public CuratorLeaderElectionManager(final int threadPoolSize, final 
Properties properties) {
+        leaderElectionMonitorEngine = new FlowEngine(threadPoolSize, "Leader 
Election Notification", true);
         zkConfig = ZooKeeperClientConfig.createConfig(properties);
     }
 
@@ -65,7 +69,14 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
         stopped = false;
 
         final RetryPolicy retryPolicy = new RetryForever(5000);
-        curatorClient = 
CuratorFrameworkFactory.newClient(zkConfig.getConnectString(), 
zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), 
retryPolicy);
+        curatorClient = CuratorFrameworkFactory.builder()
+            .connectString(zkConfig.getConnectString())
+            .sessionTimeoutMs(zkConfig.getSessionTimeoutMillis())
+            .connectionTimeoutMs(zkConfig.getConnectionTimeoutMillis())
+            .retryPolicy(retryPolicy)
+            .defaultData(new byte[0])
+            .build();
+
         curatorClient.start();
 
         // Call #register for each already-registered role. This will
@@ -227,6 +238,7 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
                     logger.error("This node was elected Leader for Role '{}' 
but failed to take leadership. Will relinquish leadership role. Failure was due 
to: {}", roleName, e);
                     logger.error("", e);
                     leader = false;
+                    Thread.sleep(1000L);
                     return;
                 }
             }
@@ -251,8 +263,10 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
                     try {
                         listener.onLeaderRelinquish();
                     } catch (final Exception e) {
-                        logger.error("This node is no longer leader for role 
'{}' but failed to shutdown leadership responsibilities properly due to: {}", 
roleName, e);
-                        logger.error("", e);
+                        logger.error("This node is no longer leader for role 
'{}' but failed to shutdown leadership responsibilities properly due to: {}", 
roleName, e.toString());
+                        if (logger.isDebugEnabled()) {
+                            logger.error("", e);
+                        }
                     }
                 }
             }

Reply via email to