http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java
index fc75601..81e3e4c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java
@@ -17,6 +17,8 @@
 
 package org.apache.nifi.cluster.manager.impl;
 
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -25,6 +27,7 @@ import 
org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.flow.DataFlowManagementService;
 import org.apache.nifi.cluster.node.Node;
 import org.apache.nifi.cluster.node.Node.Status;
 import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
@@ -41,10 +44,12 @@ public class WebClusterManagerCoordinator implements 
ClusterCoordinator {
 
     private final WebClusterManager clusterManager;
     private final ClusterManagerProtocolSender protocolSender;
+    private final DataFlowManagementService dfmService;
 
-    public WebClusterManagerCoordinator(final WebClusterManager 
clusterManager, final ClusterManagerProtocolSender protocolSender) {
+    public WebClusterManagerCoordinator(final WebClusterManager 
clusterManager, final ClusterManagerProtocolSender protocolSender, final 
DataFlowManagementService dfmService) {
         this.clusterManager = clusterManager;
         this.protocolSender = protocolSender;
+        this.dfmService = dfmService;
     }
 
     @Override
@@ -196,6 +201,8 @@ public class WebClusterManagerCoordinator implements 
ClusterCoordinator {
             protocolSender.notifyNodeStatusChange(nodesToNotify, message);
         }
 
+        
dfmService.setNodeIds(getNodeIdentifiers(NodeConnectionState.CONNECTED));
+
         return true;
     }
 
@@ -241,6 +248,17 @@ public class WebClusterManagerCoordinator implements 
ClusterCoordinator {
             message.setStatusUpdateIdentifier(statusUpdateId);
 
             protocolSender.notifyNodeStatusChange(nodesToNotify, message);
+            
dfmService.setNodeIds(getNodeIdentifiers(NodeConnectionState.CONNECTED));
         }
     }
+
+    @Override
+    public Map<NodeConnectionState, List<NodeIdentifier>> 
getConnectionStates() {
+        final Set<Node> nodes = clusterManager.getNodes();
+        final Map<NodeConnectionState, List<NodeIdentifier>> 
connectionStatusMap = nodes.stream()
+            .map(node -> node.getNodeId())
+            .collect(Collectors.groupingBy(nodeId -> 
getConnectionStatus(nodeId).getState()));
+
+        return connectionStatusMap;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
index 2b3bff9..a06bdaf 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
@@ -20,8 +20,6 @@ import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.cluster.event.EventManager;
 import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
 import org.apache.nifi.cluster.flow.DataFlowManagementService;
-import org.apache.nifi.cluster.manager.HttpRequestReplicator;
-import org.apache.nifi.cluster.manager.HttpResponseMapper;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
 import 
org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener;
 import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
@@ -61,15 +59,11 @@ public class WebClusterManagerFactoryBean implements 
FactoryBean, ApplicationCon
              */
             return null;
         } else if (clusterManager == null) {
-            final HttpRequestReplicator requestReplicator = 
applicationContext.getBean("httpRequestReplicator", 
HttpRequestReplicator.class);
-            final HttpResponseMapper responseMapper = 
applicationContext.getBean("httpResponseMapper", HttpResponseMapper.class);
             final DataFlowManagementService dataFlowService = 
applicationContext.getBean("dataFlowManagementService", 
DataFlowManagementService.class);
             final ClusterManagerProtocolSenderListener senderListener = 
applicationContext.getBean("clusterManagerProtocolSenderListener", 
ClusterManagerProtocolSenderListener.class);
 
             // create the manager
             clusterManager = new WebClusterManager(
-                    requestReplicator,
-                    responseMapper,
                     dataFlowService,
                     senderListener,
                     properties,

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
index 72c7bff..1de15cf 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
@@ -25,38 +25,6 @@
     http://www.springframework.org/schema/context 
http://www.springframework.org/schema/context/spring-context-3.1.xsd
     http://www.springframework.org/schema/aop 
http://www.springframework.org/schema/aop/spring-aop-3.1.xsd";>
 
-    <!-- jersey client -->
-    <bean id="jersey-client" class="org.apache.nifi.web.util.WebUtils" 
factory-method="createClient">
-        <constructor-arg>
-            <bean 
class="com.sun.jersey.api.client.config.DefaultClientConfig"/>
-        </constructor-arg>
-        <constructor-arg>
-            <bean 
class="org.apache.nifi.framework.security.util.SslContextFactory" 
factory-method="createSslContext">
-                <constructor-arg ref="nifiProperties"/>
-            </bean>
-        </constructor-arg>
-    </bean>
-
-    <!-- http request replicator -->
-    <bean id="httpRequestReplicator" 
class="org.apache.nifi.cluster.manager.impl.HttpRequestReplicatorImpl">
-        <constructor-arg index="0">
-            <bean factory-bean="nifiProperties" 
factory-method="getClusterManagerNodeApiRequestThreads"/>
-        </constructor-arg>
-        <constructor-arg ref="jersey-client" index="1"/>
-        <constructor-arg index="2">
-            <bean factory-bean="nifiProperties" 
factory-method="getClusterManagerNodeApiConnectionTimeout"/>
-        </constructor-arg>
-        <constructor-arg index="3">
-            <bean factory-bean="nifiProperties" 
factory-method="getClusterManagerNodeApiReadTimeout"/>
-        </constructor-arg>
-        <property name="nodeProtocolScheme">
-            <bean factory-bean="nifiProperties" 
factory-method="getClusterProtocolManagerToNodeApiScheme"/>
-        </property>
-    </bean>
-    
-    <!-- http response mapper -->
-    <bean id="httpResponseMapper" 
class="org.apache.nifi.cluster.manager.impl.HttpResponseMapperImpl"/>
-
     <!-- cluster flow DAO -->
     <bean id="dataFlowDao" 
class="org.apache.nifi.cluster.flow.impl.DataFlowDaoImpl">
         <constructor-arg index="0">

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index e218e05..7d62b54 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -283,6 +283,11 @@ public class TestAbstractHeartbeatMonitor {
         public NodeIdentifier getNodeIdentifier(final String uuid) {
             return statuses.keySet().stream().filter(p -> 
p.getId().equals(uuid)).findFirst().orElse(null);
         }
+
+        @Override
+        public Map<NodeConnectionState, List<NodeIdentifier>> 
getConnectionStates() {
+            return 
statuses.keySet().stream().collect(Collectors.groupingBy(nodeId -> 
getConnectionStatus(nodeId).getState()));
+        }
     }
 
     public static class ReportedEvent {

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestCuratorHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestCuratorHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestCuratorHeartbeatMonitor.java
new file mode 100644
index 0000000..3652595
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestCuratorHeartbeatMonitor.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCuratorHeartbeatMonitor {
+    private TestingServer zkServer;
+    private NodeIdentifier nodeId;
+    private TestFriendlyHeartbeatMonitor monitor;
+
+    @Before
+    public void setup() throws Exception {
+        zkServer = new TestingServer(true);
+        zkServer.start();
+        nodeId = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 
9999, "localhost", 8888, "localhost", null, false);
+    }
+
+    @After
+    public void clear() throws IOException {
+        if (zkServer != null) {
+            zkServer.stop();
+            zkServer.close();
+        }
+
+        if (monitor != null) {
+            monitor.stop();
+        }
+    }
+
+    /**
+     * Verifies that a node that sends a heartbeat that indicates that it is 
'connected' is asked to connect to
+     * cluster if the cluster coordinator does not know about the node
+     *
+     * @throws InterruptedException if interrupted
+     */
+    @Test
+    public void testNewConnectedHeartbeatFromUnknownNode() throws IOException, 
InterruptedException {
+        final List<NodeIdentifier> requestedToConnect = 
Collections.synchronizedList(new ArrayList<>());
+        final ClusterCoordinatorAdapter coordinator = new 
ClusterCoordinatorAdapter() {
+            @Override
+            public synchronized void requestNodeConnect(final NodeIdentifier 
nodeId) {
+                requestedToConnect.add(nodeId);
+            }
+        };
+
+        final TestFriendlyHeartbeatMonitor monitor = 
createMonitor(coordinator);
+
+        // Ensure that we request the Unknown Node connect to the cluster
+        final NodeHeartbeat heartbeat = createHeartbeat(nodeId, 
NodeConnectionState.CONNECTED);
+        monitor.addHeartbeat(heartbeat);
+        monitor.waitForProcessed();
+
+        assertEquals(1, requestedToConnect.size());
+        assertEquals(nodeId, requestedToConnect.get(0));
+        assertEquals(1, coordinator.getEvents().size());
+    }
+
+    /**
+     * Verifies that a node that sends a heartbeat that indicates that it is 
'connected' if previously
+     * manually disconnected, will be asked to disconnect from the cluster 
again.
+     *
+     * @throws InterruptedException if interrupted
+     */
+    @Test
+    public void testHeartbeatFromManuallyDisconnectedNode() throws 
InterruptedException {
+        final Set<NodeIdentifier> requestedToConnect = 
Collections.synchronizedSet(new HashSet<>());
+        final Set<NodeIdentifier> requestedToDisconnect = 
Collections.synchronizedSet(new HashSet<>());
+        final ClusterCoordinatorAdapter adapter = new 
ClusterCoordinatorAdapter() {
+            @Override
+            public synchronized void requestNodeConnect(final NodeIdentifier 
nodeId) {
+                super.requestNodeConnect(nodeId);
+                requestedToConnect.add(nodeId);
+            }
+
+            @Override
+            public synchronized void requestNodeDisconnect(final 
NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String 
explanation) {
+                super.requestNodeDisconnect(nodeId, disconnectionCode, 
explanation);
+                requestedToDisconnect.add(nodeId);
+            }
+        };
+
+        final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);
+
+        adapter.requestNodeDisconnect(nodeId, 
DisconnectionCode.USER_DISCONNECTED, "Unit Testing");
+        monitor.addHeartbeat(createHeartbeat(nodeId, 
NodeConnectionState.CONNECTED));
+        monitor.waitForProcessed();
+
+        assertEquals(1, requestedToDisconnect.size());
+        assertEquals(nodeId, requestedToDisconnect.iterator().next());
+        assertTrue(requestedToConnect.isEmpty());
+    }
+
+
+    @Test
+    public void testConnectingNodeMarkedConnectedWhenHeartbeatReceived() 
throws InterruptedException {
+        final Set<NodeIdentifier> requestedToConnect = 
Collections.synchronizedSet(new HashSet<>());
+        final Set<NodeIdentifier> connected = Collections.synchronizedSet(new 
HashSet<>());
+        final ClusterCoordinatorAdapter adapter = new 
ClusterCoordinatorAdapter() {
+            @Override
+            public synchronized void requestNodeConnect(final NodeIdentifier 
nodeId) {
+                super.requestNodeConnect(nodeId);
+                requestedToConnect.add(nodeId);
+            }
+
+            @Override
+            public synchronized void finishNodeConnection(final NodeIdentifier 
nodeId) {
+                super.finishNodeConnection(nodeId);
+                connected.add(nodeId);
+            }
+        };
+
+        final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);
+
+        adapter.requestNodeConnect(nodeId); // set state to 'connecting'
+        requestedToConnect.clear();
+
+        monitor.addHeartbeat(createHeartbeat(nodeId, 
NodeConnectionState.CONNECTED));
+        monitor.waitForProcessed();
+
+        assertEquals(1, connected.size());
+        assertEquals(nodeId, connected.iterator().next());
+        assertTrue(requestedToConnect.isEmpty());
+    }
+
+
+    @Test
+    public void testDisconnectedHeartbeatOnStartup() throws 
InterruptedException {
+        final Set<NodeIdentifier> requestedToConnect = 
Collections.synchronizedSet(new HashSet<>());
+        final Set<NodeIdentifier> connected = Collections.synchronizedSet(new 
HashSet<>());
+        final Set<NodeIdentifier> disconnected = 
Collections.synchronizedSet(new HashSet<>());
+        final ClusterCoordinatorAdapter adapter = new 
ClusterCoordinatorAdapter() {
+            @Override
+            public synchronized void requestNodeConnect(final NodeIdentifier 
nodeId) {
+                super.requestNodeConnect(nodeId);
+                requestedToConnect.add(nodeId);
+            }
+
+            @Override
+            public synchronized void finishNodeConnection(final NodeIdentifier 
nodeId) {
+                super.finishNodeConnection(nodeId);
+                connected.add(nodeId);
+            }
+
+            @Override
+            public synchronized void requestNodeDisconnect(final 
NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String 
explanation) {
+                super.requestNodeDisconnect(nodeId, disconnectionCode, 
explanation);
+                disconnected.add(nodeId);
+            }
+        };
+
+        final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);
+
+        requestedToConnect.clear();
+
+        monitor.addHeartbeat(createHeartbeat(nodeId, 
DisconnectionCode.NODE_SHUTDOWN));
+        monitor.waitForProcessed();
+
+        assertTrue(connected.isEmpty());
+        assertTrue(requestedToConnect.isEmpty());
+        assertTrue(disconnected.isEmpty());
+    }
+
+    private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final 
DisconnectionCode disconnectionCode) {
+        final NodeConnectionStatus status = new 
NodeConnectionStatus(disconnectionCode);
+        return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), 
status, false, 0, 0, 0, 0);
+    }
+
+    private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final 
NodeConnectionState state) {
+        final NodeConnectionStatus status = new NodeConnectionStatus(state);
+        return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), 
status, false, 0, 0, 0, 0);
+    }
+
+    private TestFriendlyHeartbeatMonitor createMonitor(final 
ClusterCoordinator coordinator) {
+        monitor = new TestFriendlyHeartbeatMonitor(coordinator, 
createProperties());
+        monitor.start();
+        return monitor;
+    }
+
+    private Properties createProperties() {
+        final Properties properties = new Properties();
+        
properties.setProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, "10 
ms");
+        properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, 
zkServer.getConnectString());
+        properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, "3 
secs");
+        properties.setProperty(NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, "3 
secs");
+        properties.setProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, "/nifi");
+        return properties;
+    }
+
+    private static class ClusterCoordinatorAdapter implements 
ClusterCoordinator {
+        private final Map<NodeIdentifier, NodeConnectionStatus> statuses = new 
HashMap<>();
+        private final List<ReportedEvent> events = new ArrayList<>();
+
+        @Override
+        public synchronized void requestNodeConnect(NodeIdentifier nodeId) {
+            statuses.put(nodeId, new 
NodeConnectionStatus(NodeConnectionState.CONNECTING));
+        }
+
+        @Override
+        public synchronized void finishNodeConnection(NodeIdentifier nodeId) {
+            statuses.put(nodeId, new 
NodeConnectionStatus(NodeConnectionState.CONNECTED));
+        }
+
+        @Override
+        public synchronized void requestNodeDisconnect(NodeIdentifier nodeId, 
DisconnectionCode disconnectionCode, String explanation) {
+            statuses.put(nodeId, new 
NodeConnectionStatus(NodeConnectionState.DISCONNECTED));
+        }
+
+        @Override
+        public synchronized void disconnectionRequestedByNode(NodeIdentifier 
nodeId, DisconnectionCode disconnectionCode, String explanation) {
+            statuses.put(nodeId, new 
NodeConnectionStatus(NodeConnectionState.DISCONNECTED));
+        }
+
+        @Override
+        public synchronized NodeConnectionStatus 
getConnectionStatus(NodeIdentifier nodeId) {
+            return statuses.get(nodeId);
+        }
+
+        @Override
+        public synchronized Set<NodeIdentifier> 
getNodeIdentifiers(NodeConnectionState state) {
+            return statuses.entrySet().stream().filter(p -> 
p.getValue().getState() == state).map(p -> 
p.getKey()).collect(Collectors.toSet());
+        }
+
+        @Override
+        public synchronized boolean isBlockedByFirewall(String hostname) {
+            return false;
+        }
+
+        @Override
+        public synchronized void reportEvent(NodeIdentifier nodeId, Severity 
severity, String event) {
+            events.add(new ReportedEvent(nodeId, severity, event));
+        }
+
+        @Override
+        public synchronized void setPrimaryNode(NodeIdentifier nodeId) {
+        }
+
+        synchronized List<ReportedEvent> getEvents() {
+            return new ArrayList<>(events);
+        }
+
+        @Override
+        public NodeIdentifier getNodeIdentifier(final String uuid) {
+            return statuses.keySet().stream().filter(p -> 
p.getId().equals(uuid)).findFirst().orElse(null);
+        }
+
+        @Override
+        public Map<NodeConnectionState, List<NodeIdentifier>> 
getConnectionStates() {
+            return null;
+        }
+    }
+
+    public static class ReportedEvent {
+        private final NodeIdentifier nodeId;
+        private final Severity severity;
+        private final String event;
+
+        public ReportedEvent(NodeIdentifier nodeId, Severity severity, String 
event) {
+            this.nodeId = nodeId;
+            this.severity = severity;
+            this.event = event;
+        }
+
+        public NodeIdentifier getNodeId() {
+            return nodeId;
+        }
+
+        public Severity getSeverity() {
+            return severity;
+        }
+
+        public String getEvent() {
+            return event;
+        }
+    }
+
+
+    private static class TestFriendlyHeartbeatMonitor extends 
CuratorHeartbeatMonitor {
+        private Map<NodeIdentifier, NodeHeartbeat> heartbeats = new 
HashMap<>();
+        private final Object mutex = new Object();
+
+        public TestFriendlyHeartbeatMonitor(ClusterCoordinator 
clusterCoordinator, Properties properties) {
+            super(clusterCoordinator, properties);
+        }
+
+        @Override
+        synchronized Map<NodeIdentifier, NodeHeartbeat> fetchHeartbeats() {
+            return heartbeats;
+        }
+
+        @Override
+        synchronized void monitorHeartbeats() {
+            super.monitorHeartbeats();
+
+            synchronized (mutex) {
+                mutex.notify();
+            }
+        }
+
+        synchronized void addHeartbeat(final NodeHeartbeat heartbeat) {
+            heartbeats.put(heartbeat.getNodeIdentifier(), heartbeat);
+        }
+
+        @Override
+        public synchronized void removeHeartbeat(final NodeIdentifier nodeId) {
+            heartbeats.remove(nodeId);
+        }
+
+        void waitForProcessed() throws InterruptedException {
+            synchronized (mutex) {
+                mutex.wait();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java
new file mode 100644
index 0000000..f4d5d2e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.junit.Test;
+
+public class TestProcessorEndpointMerger {
+
+    @Test
+    public void testMergeValidationErrors() {
+        final ProcessorEndpointMerger merger = new ProcessorEndpointMerger();
+        final Map<String, Set<NodeIdentifier>> validationErrorMap = new 
HashMap<>();
+
+        final NodeIdentifier nodeId1234 = new NodeIdentifier("1234", 
"localhost", 9000, "localhost", 9001, "localhost", 9002, false);
+        final List<String> nodeValidationErrors1234 = new ArrayList<>();
+        nodeValidationErrors1234.add("error 1");
+        nodeValidationErrors1234.add("error 2");
+
+        merger.mergeValidationErrors(validationErrorMap, nodeId1234, 
nodeValidationErrors1234);
+
+        final NodeIdentifier nodeXyz = new NodeIdentifier("xyz", "localhost", 
8000, "localhost", 8001, "localhost", 8002, false);
+        final List<String> nodeValidationErrorsXyz = new ArrayList<>();
+        nodeValidationErrorsXyz.add("error 1");
+
+        merger.mergeValidationErrors(validationErrorMap, nodeXyz, 
nodeValidationErrorsXyz);
+
+        assertEquals(2, validationErrorMap.size());
+
+        final Set<NodeIdentifier> idsError1 = validationErrorMap.get("error 
1");
+        assertEquals(2, idsError1.size());
+        assertTrue(idsError1.contains(nodeId1234));
+        assertTrue(idsError1.contains(nodeXyz));
+
+        final Set<NodeIdentifier> idsError2 = validationErrorMap.get("error 
2");
+        assertEquals(1, idsError2.size());
+        assertTrue(idsError2.contains(nodeId1234));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestStatusHistoryEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestStatusHistoryEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestStatusHistoryEndpointMerger.java
new file mode 100644
index 0000000..dba973c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestStatusHistoryEndpointMerger.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import static org.junit.Assert.assertEquals;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Locale;
+
+import org.junit.Test;
+
+public class TestStatusHistoryEndpointMerger {
+    @Test
+    public void testNormalizedStatusSnapshotDate() throws ParseException {
+        final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:SS.SSS", 
Locale.US);
+        final Date date1 = df.parse("2014/01/01 00:00:00.000");
+        final Date date2 = df.parse("2014/01/01 00:04:59.999");
+        final Date date3 = df.parse("2014/01/01 00:05:00.000");
+        final Date date4 = df.parse("2014/01/01 00:05:00.001");
+
+        final Date normalized1 = 
StatusHistoryEndpointMerger.normalizeStatusSnapshotDate(date1, 300000);
+        assertEquals(date1, normalized1);
+
+        final Date normalized2 = 
StatusHistoryEndpointMerger.normalizeStatusSnapshotDate(date2, 300000);
+        assertEquals(date1, normalized2);
+
+        final Date normalized3 = 
StatusHistoryEndpointMerger.normalizeStatusSnapshotDate(date3, 300000);
+        assertEquals(date3, normalized3);
+
+        final Date normalized4 = 
StatusHistoryEndpointMerger.normalizeStatusSnapshotDate(date4, 300000);
+        assertEquals(date3, normalized4);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestResponseUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestResponseUtils.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestResponseUtils.java
new file mode 100644
index 0000000..c97db03
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestResponseUtils.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.sun.jersey.api.client.ClientResponse;
+
+public class TestResponseUtils {
+
+    @Test
+    public void testFindLongResponseTimes() throws URISyntaxException {
+        final Map<NodeIdentifier, NodeResponse> responses = new HashMap<>();
+        final NodeIdentifier id1 = new NodeIdentifier("1", "localhost", 8000, 
"localhost", 8001, "localhost", 8002, false);
+        final NodeIdentifier id2 = new NodeIdentifier("2", "localhost", 8200, 
"localhost", 8201, "localhost", 8202, false);
+        final NodeIdentifier id3 = new NodeIdentifier("3", "localhost", 8300, 
"localhost", 8301, "localhost", 8302, false);
+        final NodeIdentifier id4 = new NodeIdentifier("4", "localhost", 8400, 
"localhost", 8401, "localhost", 8402, false);
+
+        final URI uri = new URI("localhost:8080");
+        final ClientResponse clientResponse = 
Mockito.mock(ClientResponse.class);
+        responses.put(id1, new NodeResponse(id1, "GET", uri, clientResponse, 
TimeUnit.MILLISECONDS.toNanos(80), "1"));
+        responses.put(id2, new NodeResponse(id1, "GET", uri, clientResponse, 
TimeUnit.MILLISECONDS.toNanos(92), "1"));
+        responses.put(id3, new NodeResponse(id1, "GET", uri, clientResponse, 
TimeUnit.MILLISECONDS.toNanos(3), "1"));
+        responses.put(id4, new NodeResponse(id1, "GET", uri, clientResponse, 
TimeUnit.MILLISECONDS.toNanos(120), "1"));
+
+        final AsyncClusterResponse response = new AsyncClusterResponse() {
+            @Override
+            public String getRequestIdentifier() {
+                return "1";
+            }
+
+            @Override
+            public String getMethod() {
+                return "GET";
+            }
+
+            @Override
+            public String getURIPath() {
+                return null;
+            }
+
+            @Override
+            public Set<NodeIdentifier> getNodesInvolved() {
+                return new HashSet<>(responses.keySet());
+            }
+
+            @Override
+            public Set<NodeIdentifier> getCompletedNodeIdentifiers() {
+                return getNodesInvolved();
+            }
+
+            @Override
+            public boolean isComplete() {
+                return true;
+            }
+
+            @Override
+            public boolean isOlderThan(long time, TimeUnit timeUnit) {
+                return true;
+            }
+
+            @Override
+            public NodeResponse getMergedResponse() {
+                return null;
+            }
+
+            @Override
+            public NodeResponse awaitMergedResponse() throws 
InterruptedException {
+                return null;
+            }
+
+            @Override
+            public NodeResponse awaitMergedResponse(long timeout, TimeUnit 
timeUnit) throws InterruptedException {
+                return null;
+            }
+
+            @Override
+            public NodeResponse getNodeResponse(NodeIdentifier nodeId) {
+                return responses.get(nodeId);
+            }
+
+            @Override
+            public Set<NodeResponse> getCompletedNodeResponses() {
+                return new HashSet<>(responses.values());
+            }
+        };
+
+        Set<NodeIdentifier> slowResponses = 
ResponseUtils.findLongResponseTimes(response, 1.5D);
+        assertTrue(slowResponses.isEmpty());
+
+        responses.put(id4, new NodeResponse(id1, "GET", uri, clientResponse, 
TimeUnit.MILLISECONDS.toNanos(2500), "1"));
+        slowResponses = ResponseUtils.findLongResponseTimes(response, 1.5D);
+        assertEquals(1, slowResponses.size());
+        assertEquals(id4, slowResponses.iterator().next());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
new file mode 100644
index 0000000..6d3571b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.ws.rs.HttpMethod;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.flow.DataFlowManagementService;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.StandardOptimisticLockingManager;
+import org.apache.nifi.web.api.entity.Entity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.header.InBoundHeaders;
+import com.sun.jersey.core.header.OutBoundHeaders;
+
+public class TestThreadPoolRequestReplicator {
+
+    @BeforeClass
+    public static void setupClass() {
+        System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, 
"src/test/resources/conf/nifi.properties");
+    }
+
+    /**
+     * If we replicate a request, whenever we obtain the merged response from 
the AsyncClusterResponse object,
+     * the response should no longer be available and should be cleared from 
internal state. This test is to
+     * verify that this behavior occurs.
+     */
+    @Test
+    public void testResponseRemovedWhenCompletedAndFetched() {
+        withReplicator(replicator -> {
+            final Set<NodeIdentifier> nodeIds = new HashSet<>();
+            nodeIds.add(new NodeIdentifier("1", "localhost", 8000, 
"localhost", 8001, "localhost", 8002, false));
+            final URI uri = new URI("http://localhost:8080/processors/1";);
+            final Entity entity = new ProcessorEntity();
+
+            final AsyncClusterResponse response = 
replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>());
+
+            // We should get back the same response object
+            assertTrue(response == 
replicator.getClusterResponse(response.getRequestIdentifier()));
+
+            assertEquals(HttpMethod.GET, response.getMethod());
+            assertEquals(nodeIds, response.getNodesInvolved());
+
+            assertTrue(response == 
replicator.getClusterResponse(response.getRequestIdentifier()));
+
+            final NodeResponse nodeResponse = response.awaitMergedResponse(3, 
TimeUnit.SECONDS);
+            assertEquals(8000, nodeResponse.getNodeId().getApiPort());
+            assertEquals(ClientResponse.Status.OK.getStatusCode(), 
nodeResponse.getStatus());
+
+            
assertNull(replicator.getClusterResponse(response.getRequestIdentifier()));
+        });
+    }
+
+
+    @Test(timeout = 15000)
+    public void testLongWaitForResponse() {
+        withReplicator(replicator -> {
+            final Set<NodeIdentifier> nodeIds = new HashSet<>();
+            final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 
8000, "localhost", 8001, "localhost", 8002, false);
+            nodeIds.add(nodeId);
+            final URI uri = new URI("http://localhost:8080/processors/1";);
+            final Entity entity = new ProcessorEntity();
+
+            final AsyncClusterResponse response = 
replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>());
+
+            // We should get back the same response object
+            assertTrue(response == 
replicator.getClusterResponse(response.getRequestIdentifier()));
+            assertFalse(response.isComplete());
+
+            final NodeResponse nodeResponse = response.getNodeResponse(nodeId);
+            assertNull(nodeResponse);
+
+            final NodeResponse completedNodeResponse = 
response.awaitMergedResponse(2, TimeUnit.SECONDS);
+            assertNotNull(completedNodeResponse);
+            assertNotNull(completedNodeResponse.getThrowable());
+            assertEquals(500, completedNodeResponse.getStatus());
+
+            assertTrue(response.isComplete());
+            assertNotNull(response.getMergedResponse());
+            
assertNull(replicator.getClusterResponse(response.getRequestIdentifier()));
+        } , Status.OK, 1000, new ClientHandlerException(new 
SocketTimeoutException()));
+    }
+
+    @Test(timeout = 15000)
+    public void testCompleteOnError() {
+        withReplicator(replicator -> {
+            final Set<NodeIdentifier> nodeIds = new HashSet<>();
+            final NodeIdentifier id1 = new NodeIdentifier("1", "localhost", 
8100, "localhost", 8101, "localhost", 8102, false);
+            final NodeIdentifier id2 = new NodeIdentifier("2", "localhost", 
8200, "localhost", 8201, "localhost", 8202, false);
+            final NodeIdentifier id3 = new NodeIdentifier("3", "localhost", 
8300, "localhost", 8301, "localhost", 8302, false);
+            final NodeIdentifier id4 = new NodeIdentifier("4", "localhost", 
8400, "localhost", 8401, "localhost", 8402, false);
+            nodeIds.add(id1);
+            nodeIds.add(id2);
+            nodeIds.add(id3);
+            nodeIds.add(id4);
+
+            final URI uri = new URI("http://localhost:8080/processors/1";);
+            final Entity entity = new ProcessorEntity();
+
+            final AsyncClusterResponse response = 
replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>());
+            assertNotNull(response.awaitMergedResponse(1, TimeUnit.SECONDS));
+        } , null, 0L, new IllegalArgumentException("Exception created for unit 
test"));
+    }
+
+
+    @Test(timeout = 15000)
+    public void testMultipleRequestWithTwoPhaseCommit() {
+        final Set<NodeIdentifier> nodeIds = new HashSet<>();
+        nodeIds.add(new NodeIdentifier("1", "localhost", 8100, "localhost", 
8101, "localhost", 8102, false));
+
+        final ClusterCoordinator coordinator = 
Mockito.mock(ClusterCoordinator.class);
+
+        final AtomicInteger requestCount = new AtomicInteger(0);
+        final DataFlowManagementService dfmService = 
Mockito.mock(DataFlowManagementService.class);
+        final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, new Client(), coordinator,
+            "1 sec", "1 sec", null, null, null, new 
StandardOptimisticLockingManager(), dfmService) {
+            @Override
+            protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method, final URI 
uri, final String requestId) {
+                // the resource builder will not expose its headers to us, so 
we are using Mockito's Whitebox class to extract them.
+                final OutBoundHeaders headers = (OutBoundHeaders) 
Whitebox.getInternalState(resourceBuilder, "metadata");
+                final Object expectsHeader = 
headers.getFirst(ThreadPoolRequestReplicator.NCM_EXPECTS_HTTP_HEADER);
+
+                final int statusCode;
+                if (requestCount.incrementAndGet() == 1) {
+                    assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE, 
expectsHeader);
+                    statusCode = 150;
+                } else {
+                    assertNull(expectsHeader);
+                    statusCode = Status.OK.getStatusCode();
+                }
+
+                // Return given response from all nodes.
+                final ClientResponse clientResponse = new 
ClientResponse(statusCode, new InBoundHeaders(), new ByteArrayInputStream(new 
byte[0]), null);
+                return new NodeResponse(nodeId, method, uri, clientResponse, 
-1L, requestId);
+            }
+        };
+
+        replicator.start();
+
+        try {
+            final AsyncClusterResponse clusterResponse = 
replicator.replicate(nodeIds, HttpMethod.POST,
+                new URI("http://localhost:80/processors/1";), new 
ProcessorEntity(), new HashMap<>());
+            clusterResponse.awaitMergedResponse();
+
+            // Ensure that we received two requests - the first should contain 
the X-NcmExpects header; the second should not.
+            // These assertions are validated above, in the overridden 
replicateRequest method.
+            assertEquals(2, requestCount.get());
+        } catch (final Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.toString());
+        } finally {
+            replicator.stop();
+        }
+    }
+
+
+    @Test(timeout = 15000)
+    public void testOneNodeRejectsTwoPhaseCommit() {
+        final Set<NodeIdentifier> nodeIds = new HashSet<>();
+        nodeIds.add(new NodeIdentifier("1", "localhost", 8100, "localhost", 
8101, "localhost", 8102, false));
+        nodeIds.add(new NodeIdentifier("2", "localhost", 8200, "localhost", 
8201, "localhost", 8202, false));
+
+        final ClusterCoordinator coordinator = 
Mockito.mock(ClusterCoordinator.class);
+
+        final AtomicInteger requestCount = new AtomicInteger(0);
+        final DataFlowManagementService dfmService = 
Mockito.mock(DataFlowManagementService.class);
+        final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, new Client(), coordinator,
+            "1 sec", "1 sec", null, null, null, new 
StandardOptimisticLockingManager(), dfmService) {
+            @Override
+            protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method, final URI 
uri, final String requestId) {
+                // the resource builder will not expose its headers to us, so 
we are using Mockito's Whitebox class to extract them.
+                final OutBoundHeaders headers = (OutBoundHeaders) 
Whitebox.getInternalState(resourceBuilder, "metadata");
+                final Object expectsHeader = 
headers.getFirst(ThreadPoolRequestReplicator.NCM_EXPECTS_HTTP_HEADER);
+
+                final int requestIndex = requestCount.incrementAndGet();
+                assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE, 
expectsHeader);
+
+                if (requestIndex == 1) {
+                    final ClientResponse clientResponse = new 
ClientResponse(150, new InBoundHeaders(), new ByteArrayInputStream(new 
byte[0]), null);
+                    return new NodeResponse(nodeId, method, uri, 
clientResponse, -1L, requestId);
+                } else {
+                    final IllegalClusterStateException explanation = new 
IllegalClusterStateException("Intentional Exception for Unit Testing");
+                    return new NodeResponse(nodeId, method, uri, explanation);
+                }
+            }
+        };
+
+        replicator.start();
+
+        try {
+            final AsyncClusterResponse clusterResponse = 
replicator.replicate(nodeIds, HttpMethod.POST,
+                new URI("http://localhost:80/processors/1";), new 
ProcessorEntity(), new HashMap<>());
+            clusterResponse.awaitMergedResponse();
+
+            Assert.fail("Expected to get an IllegalClusterStateException but 
did not");
+        } catch (final IllegalClusterStateException e) {
+            // Expected
+        } catch (final Exception e) {
+            Assert.fail(e.toString());
+        } finally {
+            replicator.stop();
+        }
+    }
+
+
+
+    private void withReplicator(final WithReplicator function) {
+        withReplicator(function, ClientResponse.Status.OK, 0L, null);
+    }
+
+    private void withReplicator(final WithReplicator function, final Status 
status, final long delayMillis, final RuntimeException failure) {
+        final ClusterCoordinator coordinator = 
Mockito.mock(ClusterCoordinator.class);
+
+        final DataFlowManagementService dfmService = 
Mockito.mock(DataFlowManagementService.class);
+        final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, new Client(), coordinator,
+            "1 sec", "1 sec", null, null, null, new 
StandardOptimisticLockingManager(), dfmService) {
+            @Override
+            protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method, final URI 
uri, final String requestId) {
+                if (delayMillis > 0L) {
+                    try {
+                        Thread.sleep(delayMillis);
+                    } catch (InterruptedException e) {
+                        Assert.fail("Thread Interrupted durating test");
+                    }
+                }
+
+                if (failure != null) {
+                    throw failure;
+                }
+
+                // Return given response from all nodes.
+                final ClientResponse clientResponse = new 
ClientResponse(status, new InBoundHeaders(), new ByteArrayInputStream(new 
byte[0]), null);
+                return new NodeResponse(nodeId, method, uri, clientResponse, 
-1L, requestId);
+            }
+        };
+
+        replicator.start();
+
+        try {
+            function.withReplicator(replicator);
+        } catch (final Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.toString());
+        } finally {
+            replicator.stop();
+        }
+    }
+
+    private interface WithReplicator {
+        void withReplicator(ThreadPoolRequestReplicator replicator) throws 
Exception;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java
index e3e9ac6..6b93ef3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java
@@ -44,6 +44,7 @@ import org.apache.nifi.cluster.manager.testutils.HttpResponse;
 import org.apache.nifi.cluster.manager.testutils.HttpResponseAction;
 import org.apache.nifi.cluster.manager.testutils.HttpServer;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.util.NiFiProperties;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -69,6 +70,7 @@ public class HttpRequestReplicatorImplTest {
 
     @Before
     public void setUp() throws IOException, URISyntaxException {
+        System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, 
getClass().getResource("/conf/nifi.properties").getFile());
 
         executorThreadCount = 5;
         serverThreadCount = 3;

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java
deleted file mode 100644
index 55c6c31..0000000
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.manager.impl;
-
-import static org.junit.Assert.assertEquals;
-
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Locale;
-
-import org.junit.Test;
-
-public class TestWebClusterManager {
-
-    @Test
-    public void testNormalizedStatusSnapshotDate() throws ParseException {
-        final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:SS.SSS", 
Locale.US);
-        final Date date1 = df.parse("2014/01/01 00:00:00.000");
-        final Date date2 = df.parse("2014/01/01 00:04:59.999");
-        final Date date3 = df.parse("2014/01/01 00:05:00.000");
-        final Date date4 = df.parse("2014/01/01 00:05:00.001");
-
-        final Date normalized1 = 
WebClusterManager.normalizeStatusSnapshotDate(date1, 300000);
-        assertEquals(date1, normalized1);
-
-        final Date normalized2 = 
WebClusterManager.normalizeStatusSnapshotDate(date2, 300000);
-        assertEquals(date1, normalized2);
-
-        final Date normalized3 = 
WebClusterManager.normalizeStatusSnapshotDate(date3, 300000);
-        assertEquals(date3, normalized3);
-
-        final Date normalized4 = 
WebClusterManager.normalizeStatusSnapshotDate(date4, 300000);
-        assertEquals(date3, normalized4);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties
new file mode 100644
index 0000000..78a649b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Core Properties #
+nifi.version=nifi-test 3.0.0
+nifi.flow.configuration.file=./target/flow.xml.gz
+nifi.flow.configuration.archive.dir=./target/archive/
+nifi.flowcontroller.autoResumeState=true
+nifi.flowcontroller.graceful.shutdown.period=10 sec
+nifi.flowservice.writedelay.interval=2 sec
+nifi.administrative.yield.duration=30 sec
+
+nifi.reporting.task.configuration.file=./target/reporting-tasks.xml
+nifi.controller.service.configuration.file=./target/controller-services.xml
+nifi.templates.directory=./target/templates
+nifi.ui.banner.text=UI Banner Text
+nifi.ui.autorefresh.interval=30 sec
+nifi.nar.library.directory=./target/lib
+nifi.nar.working.directory=./target/work/nar/
+
+# H2 Settings
+nifi.database.directory=./database_repository
+nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
+
+# FlowFile Repository
+nifi.flowfile.repository.directory=./target/test-repo
+nifi.flowfile.repository.partitions=1
+nifi.flowfile.repository.checkpoint.interval=2 mins
+nifi.queue.swap.threshold=20000
+nifi.swap.storage.directory=./target/test-repo/swap
+nifi.swap.in.period=5 sec
+nifi.swap.in.threads=1
+nifi.swap.out.period=5 sec
+nifi.swap.out.threads=4
+
+# Content Repository
+nifi.content.claim.max.appendable.size=10 MB
+nifi.content.claim.max.flow.files=100
+nifi.content.repository.directory.default=./target/content_repository
+
+# Provenance Repository Properties
+nifi.provenance.repository.storage.directory=./target/provenance_repository
+nifi.provenance.repository.max.storage.time=24 hours
+nifi.provenance.repository.max.storage.size=1 GB
+nifi.provenance.repository.rollover.time=30 secs
+nifi.provenance.repository.rollover.size=100 MB
+
+# Site to Site properties
+nifi.remote.input.socket.port=9990
+nifi.remote.input.secure=true
+
+# web properties #
+nifi.web.war.directory=./target/lib
+nifi.web.http.host=
+nifi.web.http.port=8080
+nifi.web.https.host=
+nifi.web.https.port=
+nifi.web.jetty.working.directory=./target/work/jetty
+
+# security properties #
+nifi.sensitive.props.key=key
+nifi.sensitive.props.algorithm=PBEWITHMD5AND256BITAES-CBC-OPENSSL
+nifi.sensitive.props.provider=BC
+
+nifi.security.keystore=
+nifi.security.keystoreType=
+nifi.security.keystorePasswd=
+nifi.security.keyPasswd=
+nifi.security.truststore=
+nifi.security.truststoreType=
+nifi.security.truststorePasswd=
+nifi.security.needClientAuth=
+nifi.security.authorizedUsers.file=./target/conf/authorized-users.xml
+nifi.security.user.credential.cache.duration=24 hours
+nifi.security.user.authority.provider=nifi.authorization.FileAuthorizationProvider
+nifi.security.support.new.account.requests=
+nifi.security.default.user.roles=
+
+# cluster common properties (cluster manager and nodes must have same values) #
+nifi.cluster.protocol.heartbeat.interval=5 sec
+nifi.cluster.protocol.is.secure=false
+nifi.cluster.protocol.socket.timeout=30 sec
+nifi.cluster.protocol.connection.handshake.timeout=45 sec
+# if multicast is used, then nifi.cluster.protocol.multicast.xxx properties 
must be configured #
+nifi.cluster.protocol.use.multicast=false
+nifi.cluster.protocol.multicast.address=
+nifi.cluster.protocol.multicast.port=
+nifi.cluster.protocol.multicast.service.broadcast.delay=500 ms
+nifi.cluster.protocol.multicast.service.locator.attempts=3
+nifi.cluster.protocol.multicast.service.locator.attempts.delay=1 sec
+
+# cluster node properties (only configure for cluster nodes) #
+nifi.cluster.is.node=false
+nifi.cluster.node.address=
+nifi.cluster.node.protocol.port=
+nifi.cluster.node.protocol.threads=2
+# if multicast is not used, nifi.cluster.node.unicast.xxx must have same 
values as nifi.cluster.manager.xxx #
+nifi.cluster.node.unicast.manager.address=
+nifi.cluster.node.unicast.manager.protocol.port=
+nifi.cluster.node.unicast.manager.authority.provider.port=
+
+# cluster manager properties (only configure for cluster manager) #
+nifi.cluster.is.manager=false
+nifi.cluster.manager.address=
+nifi.cluster.manager.protocol.port=
+nifi.cluster.manager.authority.provider.port=
+nifi.cluster.manager.authority.provider.threads=10
+nifi.cluster.manager.node.firewall.file=
+nifi.cluster.manager.node.event.history.size=10
+nifi.cluster.manager.node.api.connection.timeout=30 sec
+nifi.cluster.manager.node.api.read.timeout=30 sec
+nifi.cluster.manager.node.api.request.threads=10
+nifi.cluster.manager.flow.retrieval.delay=5 sec
+nifi.cluster.manager.protocol.threads=10
+nifi.cluster.manager.safemode.duration=0 sec

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index a600699..4db6bc9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -20,6 +20,7 @@ import java.util.Collection;
 import java.util.Set;
 
 import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerServiceLookup;
 
 /**
@@ -109,7 +110,7 @@ public interface ControllerServiceProvider extends 
ControllerServiceLookup {
      *
      * @param serviceNode the node
      */
-    void unscheduleReferencingComponents(ControllerServiceNode serviceNode);
+    Set<ConfiguredComponent> 
unscheduleReferencingComponents(ControllerServiceNode serviceNode);
 
     /**
      * Verifies that all Controller Services referencing the provided 
Controller
@@ -130,7 +131,7 @@ public interface ControllerServiceProvider extends 
ControllerServiceLookup {
      *
      * @param serviceNode the node
      */
-    void disableReferencingServices(ControllerServiceNode serviceNode);
+    Set<ConfiguredComponent> disableReferencingServices(ControllerServiceNode 
serviceNode);
 
     /**
      * Verifies that all Controller Services referencing the provided
@@ -149,8 +150,10 @@ public interface ControllerServiceProvider extends 
ControllerServiceLookup {
      * Service A and B will both be enabled.
      *
      * @param serviceNode the node
+     *
+     * @return the set of all components that were updated as a result of this 
action
      */
-    void enableReferencingServices(ControllerServiceNode serviceNode);
+    Set<ConfiguredComponent> enableReferencingServices(ControllerServiceNode 
serviceNode);
 
     /**
      * Verifies that all enabled Processors referencing the ControllerService
@@ -172,5 +175,5 @@ public interface ControllerServiceProvider extends 
ControllerServiceLookup {
      *
      * @param serviceNode the node
      */
-    void scheduleReferencingComponents(ControllerServiceNode serviceNode);
+    Set<ConfiguredComponent> 
scheduleReferencingComponents(ControllerServiceNode serviceNode);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 8c1cb9b..b888a1b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -83,7 +83,7 @@ import org.apache.nifi.connectable.Position;
 import org.apache.nifi.connectable.Size;
 import org.apache.nifi.connectable.StandardConnection;
 import org.apache.nifi.controller.cluster.Heartbeater;
-import org.apache.nifi.controller.cluster.ZooKeeperHeartbeater;
+import org.apache.nifi.controller.cluster.ClusterProtocolHeartbeater;
 import org.apache.nifi.controller.exception.CommunicationsException;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
@@ -534,7 +534,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         if (configuredForClustering) {
             leaderElectionManager = new CuratorLeaderElectionManager(4);
-            heartbeater = new ZooKeeperHeartbeater(protocolSender, properties);
+            heartbeater = new ClusterProtocolHeartbeater(protocolSender, 
properties);
         } else {
             leaderElectionManager = null;
             heartbeater = null;
@@ -1273,6 +1273,11 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     private void sendShutdownNotification() {
         // Generate a heartbeat message and publish it, indicating that we are 
shutting down
         final HeartbeatMessage heartbeatMsg = createHeartbeatMessage();
+        if (heartbeatMsg == null) {
+            LOG.warn("Cannot sent Shutdown Notification Message because node's 
identifier is not known at this time");
+            return;
+        }
+
         final Heartbeat heartbeat = heartbeatMsg.getHeartbeat();
         final byte[] hbPayload = heartbeatMsg.getHeartbeat().getPayload();
         final NodeConnectionStatus connectionStatus = new 
NodeConnectionStatus(DisconnectionCode.NODE_SHUTDOWN);
@@ -2540,6 +2545,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             status.setProcessingNanos(0);
             status.setInvocations(0);
             status.setAverageLineageDuration(0L);
+            status.setFlowFilesRemoved(0);
         } else {
             final int processedCount = entry.getFlowFilesOut();
             final long numProcessedBytes = entry.getContentSizeOut();
@@ -2566,6 +2572,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             status.setBytesReceived(entry.getBytesReceived());
             status.setFlowFilesSent(entry.getFlowFilesSent());
             status.setBytesSent(entry.getBytesSent());
+            status.setFlowFilesRemoved(entry.getFlowFilesRemoved());
         }
 
         // determine the run status and get any validation errors... must check
@@ -2835,23 +2842,23 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     }
 
     @Override
-    public void disableReferencingServices(final ControllerServiceNode 
serviceNode) {
-        controllerServiceProvider.disableReferencingServices(serviceNode);
+    public Set<ConfiguredComponent> disableReferencingServices(final 
ControllerServiceNode serviceNode) {
+        return 
controllerServiceProvider.disableReferencingServices(serviceNode);
     }
 
     @Override
-    public void enableReferencingServices(final ControllerServiceNode 
serviceNode) {
-        controllerServiceProvider.enableReferencingServices(serviceNode);
+    public Set<ConfiguredComponent> enableReferencingServices(final 
ControllerServiceNode serviceNode) {
+        return 
controllerServiceProvider.enableReferencingServices(serviceNode);
     }
 
     @Override
-    public void scheduleReferencingComponents(final ControllerServiceNode 
serviceNode) {
-        controllerServiceProvider.scheduleReferencingComponents(serviceNode);
+    public Set<ConfiguredComponent> scheduleReferencingComponents(final 
ControllerServiceNode serviceNode) {
+        return 
controllerServiceProvider.scheduleReferencingComponents(serviceNode);
     }
 
     @Override
-    public void unscheduleReferencingComponents(final ControllerServiceNode 
serviceNode) {
-        controllerServiceProvider.unscheduleReferencingComponents(serviceNode);
+    public Set<ConfiguredComponent> unscheduleReferencingComponents(final 
ControllerServiceNode serviceNode) {
+        return 
controllerServiceProvider.unscheduleReferencingComponents(serviceNode);
     }
 
     @Override
@@ -3744,7 +3751,13 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             hbPayload.setTotalFlowFileBytes(queueSize.getByteCount());
 
             // create heartbeat message
-            final Heartbeat heartbeat = new Heartbeat(getNodeId(), 
bean.isPrimary(), bean.getConnectionStatus(), hbPayload.marshal());
+            final NodeIdentifier nodeId = getNodeId();
+            if (nodeId == null) {
+                LOG.warn("Cannot create Heartbeat Message because node's 
identifier is not known at this time");
+                return null;
+            }
+
+            final Heartbeat heartbeat = new Heartbeat(nodeId, 
bean.isPrimary(), bean.getConnectionStatus(), hbPayload.marshal());
             final HeartbeatMessage message = new HeartbeatMessage();
             message.setHeartbeat(heartbeat);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 45999a3..fb72683 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -17,6 +17,8 @@
 package org.apache.nifi.controller;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -203,6 +205,18 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
 
     }
 
+    @Override
+    public void archiveFlow() throws IOException {
+        writeLock.lock();
+        try {
+            final File archiveFile = dao.createArchiveFile();
+            try (final OutputStream out = new FileOutputStream(archiveFile)) {
+                dao.load(out, true);
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
 
     @Override
     public void saveFlowChanges() throws IOException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardSnippet.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardSnippet.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardSnippet.java
index 9206054..7060699 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardSnippet.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardSnippet.java
@@ -16,13 +16,15 @@
  */
 package org.apache.nifi.controller;
 
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
 
 import javax.xml.bind.annotation.XmlRootElement;
 
+import org.apache.nifi.web.Revision;
+
+
 /**
  * Represents a data flow snippet.
  */
@@ -33,14 +35,14 @@ public class StandardSnippet implements Snippet {
     private String parentGroupId;
     private Boolean linked;
 
-    private final Set<String> processGroups = new HashSet<>();
-    private final Set<String> remoteProcessGroups = new HashSet<>();
-    private final Set<String> processors = new HashSet<>();
-    private final Set<String> inputPorts = new HashSet<>();
-    private final Set<String> outputPorts = new HashSet<>();
-    private final Set<String> connections = new HashSet<>();
-    private final Set<String> labels = new HashSet<>();
-    private final Set<String> funnels = new HashSet<>();
+    private Map<String, Revision> processGroups = new HashMap<>();
+    private Map<String, Revision> remoteProcessGroups = new HashMap<>();
+    private Map<String, Revision> processors = new HashMap<>();
+    private Map<String, Revision> inputPorts = new HashMap<>();
+    private Map<String, Revision> outputPorts = new HashMap<>();
+    private Map<String, Revision> connections = new HashMap<>();
+    private Map<String, Revision> labels = new HashMap<>();
+    private Map<String, Revision> funnels = new HashMap<>();
 
     @Override
     public String getId() {
@@ -74,75 +76,75 @@ public class StandardSnippet implements Snippet {
     }
 
     @Override
-    public Set<String> getConnections() {
-        return Collections.unmodifiableSet(connections);
+    public Map<String, Revision> getConnections() {
+        return Collections.unmodifiableMap(connections);
     }
 
-    public void addConnections(Collection<String> ids) {
-        connections.addAll(ids);
+    public void addConnections(Map<String, Revision> ids) {
+        connections.putAll(ids);
     }
 
     @Override
-    public Set<String> getFunnels() {
-        return Collections.unmodifiableSet(funnels);
+    public Map<String, Revision> getFunnels() {
+        return Collections.unmodifiableMap(funnels);
     }
 
-    public void addFunnels(Collection<String> ids) {
-        funnels.addAll(ids);
+    public void addFunnels(Map<String, Revision> ids) {
+        funnels.putAll(ids);
     }
 
     @Override
-    public Set<String> getInputPorts() {
-        return Collections.unmodifiableSet(inputPorts);
+    public Map<String, Revision> getInputPorts() {
+        return Collections.unmodifiableMap(inputPorts);
     }
 
-    public void addInputPorts(Collection<String> ids) {
-        inputPorts.addAll(ids);
+    public void addInputPorts(Map<String, Revision> ids) {
+        inputPorts.putAll(ids);
     }
 
     @Override
-    public Set<String> getOutputPorts() {
-        return Collections.unmodifiableSet(outputPorts);
+    public Map<String, Revision> getOutputPorts() {
+        return Collections.unmodifiableMap(outputPorts);
     }
 
-    public void addOutputPorts(Collection<String> ids) {
-        outputPorts.addAll(ids);
+    public void addOutputPorts(Map<String, Revision> ids) {
+        outputPorts.putAll(ids);
     }
 
     @Override
-    public Set<String> getLabels() {
-        return Collections.unmodifiableSet(labels);
+    public Map<String, Revision> getLabels() {
+        return Collections.unmodifiableMap(labels);
     }
 
-    public void addLabels(Collection<String> ids) {
-        labels.addAll(ids);
+    public void addLabels(Map<String, Revision> ids) {
+        labels.putAll(ids);
     }
 
     @Override
-    public Set<String> getProcessGroups() {
-        return Collections.unmodifiableSet(processGroups);
+    public Map<String, Revision> getProcessGroups() {
+        return Collections.unmodifiableMap(processGroups);
     }
 
-    public void addProcessGroups(Collection<String> ids) {
-        processGroups.addAll(ids);
+    public void addProcessGroups(Map<String, Revision> ids) {
+        processGroups.putAll(ids);
     }
 
     @Override
-    public Set<String> getProcessors() {
-        return Collections.unmodifiableSet(processors);
+    public Map<String, Revision> getProcessors() {
+        return Collections.unmodifiableMap(processors);
     }
 
-    public void addProcessors(Collection<String> ids) {
-        processors.addAll(ids);
+    public void addProcessors(Map<String, Revision> ids) {
+        processors.putAll(ids);
     }
 
     @Override
-    public Set<String> getRemoteProcessGroups() {
-        return Collections.unmodifiableSet(remoteProcessGroups);
+    public Map<String, Revision> getRemoteProcessGroups() {
+        return Collections.unmodifiableMap(remoteProcessGroups);
     }
 
-    public void addRemoteProcessGroups(Collection<String> ids) {
-        remoteProcessGroups.addAll(ids);
+    public void addRemoteProcessGroups(Map<String, Revision> ids) {
+        remoteProcessGroups.putAll(ids);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
new file mode 100644
index 0000000..8e4236b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.cluster;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryForever;
+import org.apache.nifi.cluster.protocol.NodeProtocolSender;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses ZooKeeper in order to determine which node is the elected Cluster 
Coordinator and to indicate
+ * that this node is part of the cluster. However, once the Cluster 
Coordinator is known, heartbeats are
+ * sent directly to the Cluster Coordinator.
+ */
+public class ClusterProtocolHeartbeater implements Heartbeater {
+    private static final Logger logger = 
LoggerFactory.getLogger(ClusterProtocolHeartbeater.class);
+
+    private final NodeProtocolSender protocolSender;
+    private final CuratorFramework curatorClient;
+    private final String nodesPathPrefix;
+
+    private final String coordinatorPath;
+    private volatile String coordinatorAddress;
+
+
+    public ClusterProtocolHeartbeater(final NodeProtocolSender protocolSender, 
final Properties properties) {
+        this.protocolSender = protocolSender;
+
+        final RetryPolicy retryPolicy = new RetryForever(5000);
+        final ZooKeeperClientConfig zkConfig = 
ZooKeeperClientConfig.createConfig(properties);
+
+        curatorClient = 
CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
+            zkConfig.getSessionTimeoutMillis(), 
zkConfig.getConnectionTimeoutMillis(), retryPolicy);
+
+        curatorClient.start();
+        nodesPathPrefix = zkConfig.resolvePath("cluster/nodes");
+        coordinatorPath = nodesPathPrefix + "/coordinator";
+    }
+
+    private String getHeartbeatAddress() throws IOException {
+        final String curAddress = coordinatorAddress;
+        if (curAddress != null) {
+            return curAddress;
+        }
+
+        try {
+            // Get coordinator address and add watcher to change who we are 
heartbeating to if the value changes.
+            final byte[] coordinatorAddressBytes = 
curatorClient.getData().usingWatcher(new Watcher() {
+                @Override
+                public void process(final WatchedEvent event) {
+                    coordinatorAddress = null;
+                }
+            }).forPath(coordinatorPath);
+            final String address = coordinatorAddress = new 
String(coordinatorAddressBytes, StandardCharsets.UTF_8);
+
+            logger.info("Determined that Cluster Coordinator is located at {}; 
will use this address for sending heartbeat messages", address);
+            return address;
+        } catch (Exception e) {
+            throw new IOException("Unable to determine Cluster Coordinator 
from ZooKeeper", e);
+        }
+    }
+
+
+    @Override
+    public synchronized void send(final HeartbeatMessage heartbeatMessage) 
throws IOException {
+        final String heartbeatAddress = getHeartbeatAddress();
+
+        try {
+            protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
+        } catch (final ProtocolException pe) {
+            // a ProtocolException is likely the result of not being able to 
communicate
+            // with the coordinator. If we do get an IOException communicating 
with the coordinator,
+            // it will be the cause of the Protocol Exception. In this case, 
set coordinatorAddress
+            // to null so that we double-check next time that the coordinator 
has not changed.
+            if (pe.getCause() instanceof IOException) {
+                coordinatorAddress = null;
+            }
+
+            throw pe;
+        }
+    }
+
+
+    @Override
+    public void close() throws IOException {
+        if (curatorClient != null) {
+            curatorClient.close();
+        }
+
+        logger.info("ZooKeeper heartbeater closed. Will no longer send 
Heartbeat messages to ZooKeeper");
+    }
+}

Reply via email to