http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index fbf400b..dbcb8fd 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -19,23 +19,17 @@ package org.apache.nifi.cluster.manager.impl;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.io.Serializable;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.Map;
-import java.util.NavigableSet;
 import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
@@ -50,8 +44,6 @@ import java.util.regex.Pattern;
 
 import javax.net.ssl.SSLContext;
 import javax.ws.rs.HttpMethod;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.StreamingOutput;
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
@@ -71,6 +63,12 @@ import org.apache.nifi.cluster.context.ClusterContext;
 import org.apache.nifi.cluster.context.ClusterContextImpl;
 import 
org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor;
 import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
+import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
+import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
+import 
org.apache.nifi.cluster.coordination.http.replication.AsyncClusterResponse;
+import 
org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
+import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
+import 
org.apache.nifi.cluster.coordination.http.replication.ThreadPoolRequestReplicator;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.event.Event;
 import org.apache.nifi.cluster.event.EventManager;
@@ -80,10 +78,7 @@ import org.apache.nifi.cluster.flow.DaoException;
 import org.apache.nifi.cluster.flow.DataFlowManagementService;
 import org.apache.nifi.cluster.flow.PersistedFlowState;
 import org.apache.nifi.cluster.manager.HttpClusterManager;
-import org.apache.nifi.cluster.manager.HttpRequestReplicator;
-import org.apache.nifi.cluster.manager.HttpResponseMapper;
 import org.apache.nifi.cluster.manager.NodeResponse;
-import org.apache.nifi.cluster.manager.StatusMerger;
 import org.apache.nifi.cluster.manager.exception.ConflictingNodeIdException;
 import 
org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
 import 
org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
@@ -116,6 +111,7 @@ import 
org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
@@ -123,8 +119,6 @@ import org.apache.nifi.controller.StandardFlowSerializer;
 import org.apache.nifi.controller.StandardProcessorNode;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
-import org.apache.nifi.controller.queue.DropFlowFileState;
-import org.apache.nifi.controller.queue.ListFlowFileState;
 import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
 import 
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
 import org.apache.nifi.controller.reporting.ReportingTaskProvider;
@@ -135,21 +129,12 @@ import 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
 import org.apache.nifi.controller.service.ControllerServiceLoader;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
-import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
-import org.apache.nifi.controller.state.SortedStateUtils;
 import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
-import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
-import org.apache.nifi.controller.status.history.MetricDescriptor;
-import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
-import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor;
-import 
org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor;
-import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
-import org.apache.nifi.controller.status.history.StatusHistoryUtil;
-import org.apache.nifi.controller.status.history.StatusSnapshot;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.events.VolatileBulletinRepository;
 import org.apache.nifi.framework.security.util.SslContextFactory;
 import org.apache.nifi.io.socket.multicast.DiscoverableService;
@@ -171,9 +156,7 @@ import 
org.apache.nifi.remote.cluster.ClusterNodeInformation;
 import org.apache.nifi.remote.cluster.NodeInformation;
 import org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol;
 import org.apache.nifi.reporting.Bulletin;
-import org.apache.nifi.reporting.BulletinQuery;
 import org.apache.nifi.reporting.BulletinRepository;
-import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.reporting.ReportingInitializationContext;
 import org.apache.nifi.reporting.ReportingTask;
@@ -187,74 +170,7 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.web.OptimisticLockingManager;
 import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.UpdateRevision;
-import org.apache.nifi.web.api.dto.BulletinBoardDTO;
 import org.apache.nifi.web.api.dto.BulletinDTO;
-import org.apache.nifi.web.api.dto.ComponentStateDTO;
-import org.apache.nifi.web.api.dto.ControllerServiceDTO;
-import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
-import org.apache.nifi.web.api.dto.CountersDTO;
-import org.apache.nifi.web.api.dto.DropRequestDTO;
-import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
-import org.apache.nifi.web.api.dto.FlowSnippetDTO;
-import org.apache.nifi.web.api.dto.ListingRequestDTO;
-import org.apache.nifi.web.api.dto.NodeCountersSnapshotDTO;
-import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsSnapshotDTO;
-import org.apache.nifi.web.api.dto.ProcessGroupDTO;
-import org.apache.nifi.web.api.dto.ProcessorDTO;
-import org.apache.nifi.web.api.dto.QueueSizeDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
-import org.apache.nifi.web.api.dto.ReportingTaskDTO;
-import org.apache.nifi.web.api.dto.StateEntryDTO;
-import org.apache.nifi.web.api.dto.StateMapDTO;
-import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
-import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
-import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
-import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
-import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO;
-import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
-import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
-import org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.NodePortStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.NodeProcessorStatusSnapshotDTO;
-import 
org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO;
-import org.apache.nifi.web.api.dto.status.PortStatusDTO;
-import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
-import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
-import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
-import org.apache.nifi.web.api.entity.BulletinBoardEntity;
-import org.apache.nifi.web.api.entity.ComponentStateEntity;
-import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
-import org.apache.nifi.web.api.entity.ControllerServiceEntity;
-import 
org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
-import org.apache.nifi.web.api.entity.ControllerServicesEntity;
-import org.apache.nifi.web.api.entity.ControllerStatusEntity;
-import org.apache.nifi.web.api.entity.CountersEntity;
-import org.apache.nifi.web.api.entity.DropRequestEntity;
-import org.apache.nifi.web.api.entity.FlowSnippetEntity;
-import org.apache.nifi.web.api.entity.ListingRequestEntity;
-import org.apache.nifi.web.api.entity.PortStatusEntity;
-import org.apache.nifi.web.api.entity.ProcessGroupEntity;
-import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
-import org.apache.nifi.web.api.entity.ProcessorEntity;
-import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
-import org.apache.nifi.web.api.entity.ProcessorsEntity;
-import org.apache.nifi.web.api.entity.ProvenanceEntity;
-import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
-import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
-import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity;
-import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
-import org.apache.nifi.web.api.entity.ReportingTaskEntity;
-import org.apache.nifi.web.api.entity.ReportingTasksEntity;
-import org.apache.nifi.web.api.entity.StatusHistoryEntity;
-import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity;
 import org.apache.nifi.web.util.WebUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -265,7 +181,8 @@ import org.w3c.dom.NodeList;
 import org.xml.sax.SAXException;
 import org.xml.sax.SAXParseException;
 
-import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
 
 /**
  * Provides a cluster manager implementation. The manager federates incoming 
HTTP client requests to the nodes' external API using the HTTP protocol. The 
manager also communicates with nodes using the
@@ -278,7 +195,7 @@ import com.sun.jersey.api.client.ClientResponse;
  * The start() and stop() methods must be called to initialize and stop the 
instance.
  *
  */
-public class WebClusterManager implements HttpClusterManager, ProtocolHandler, 
ControllerServiceProvider, ReportingTaskProvider {
+public class WebClusterManager implements HttpClusterManager, ProtocolHandler, 
ControllerServiceProvider, ReportingTaskProvider, RequestCompletionCallback {
 
     public static final String ROOT_GROUP_ID_ALIAS = "root";
     public static final String BULLETIN_CATEGORY = "Clustering";
@@ -322,58 +239,13 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     private static final int DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS = 5;
 
 
-    public static final Pattern PROCESSORS_URI_PATTERN = 
Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors");
-    public static final Pattern PROCESSOR_URI_PATTERN = 
Pattern.compile("/nifi-api/processors/[a-f0-9\\-]{36}");
-    public static final Pattern PROCESSOR_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/processors/[a-f0-9\\-]{36}/status");
-    public static final Pattern PROCESSOR_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/processors/[a-f0-9\\-]{36}/state");
     public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = 
Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}");
 
-    public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = 
Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups");
-    public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = 
Pattern.compile("/nifi-api/remote-process-groups/[a-f0-9\\-]{36}");
-
-    public static final Pattern PROCESS_GROUP_URI_PATTERN = 
Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))");
-    public static final Pattern GROUP_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status");
-    public static final Pattern CONTROLLER_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/status");
-    public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = 
Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance");
-    public static final Pattern FLOW_SNIPPET_INSTANCE_URI_PATTERN = 
Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/snippet-instance");
-
-    public static final String PROVENANCE_URI = "/nifi-api/provenance";
-    public static final Pattern PROVENANCE_QUERY_URI = 
Pattern.compile("/nifi-api/provenance/[a-f0-9\\-]{36}");
-    public static final Pattern PROVENANCE_EVENT_URI = 
Pattern.compile("/nifi-api/provenance/events/[0-9]+");
-
-    public static final Pattern CONTROLLER_SERVICES_URI = 
Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/controller-services/node");
-    public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller-services/node/[a-f0-9\\-]{36}");
-    public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller-services/node/[a-f0-9\\-]{36}/state");
-    public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = 
Pattern.compile("/nifi-api/controller-services/node/[a-f0-9\\-]{36}/references");
-    public static final String REPORTING_TASKS_URI = 
"/nifi-api/controller/reporting-tasks/node";
-    public static final Pattern REPORTING_TASK_URI_PATTERN = 
Pattern.compile("/nifi-api/reporting-tasks/node/[a-f0-9\\-]{36}");
-    public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/reporting-tasks/node/[a-f0-9\\-]{36}/state");
-    public static final Pattern BULLETIN_BOARD_URI_PATTERN = 
Pattern.compile("/nifi-api/bulletin-board");
-    public static final Pattern SYSTEM_DIAGNOSTICS_URI_PATTERN = 
Pattern.compile("/nifi-api/system-diagnostics");
-    public static final Pattern COUNTERS_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/counters");
-    public static final Pattern COUNTER_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}");
-
-    public static final Pattern PROCESSOR_STATUS_HISTORY_URI_PATTERN =
-        
Pattern.compile("/nifi-api/flow/processors/[a-f0-9\\-]{36}/status/history");
-    public static final Pattern PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status/history");
-    public static final Pattern 
REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern
-        
.compile("/nifi-api/flow/remote-process-groups/[a-f0-9\\-]{36}/status/history");
-    public static final Pattern CONNECTION_STATUS_HISTORY_URI_PATTERN = Pattern
-        .compile("/nifi-api/flow/connections/[a-f0-9\\-]{36}/status/history");
 
-    public static final Pattern CONNECTION_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/connections/[a-f0-9\\-]{36}/status");
-    public static final Pattern INPUT_PORT_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/input-ports/[a-f0-9\\-]{36}/status");
-    public static final Pattern OUTPUT_PORT_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/output-ports/[a-f0-9\\-]{36}/status");
-    public static final Pattern REMOTE_PROCESS_GROUP_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/remote-process-groups/[a-f0-9\\-]{36}/status");
-
-    public static final Pattern DROP_REQUESTS_URI = 
Pattern.compile("/nifi-api/connections/[a-f0-9\\-]{36}/drop-requests");
-    public static final Pattern DROP_REQUEST_URI = 
Pattern.compile("/nifi-api/connections/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}");
-    public static final Pattern LISTING_REQUESTS_URI = 
Pattern.compile("/nifi-api/connections/[a-f0-9\\-]{36}/listing-requests");
-    public static final Pattern LISTING_REQUEST_URI = 
Pattern.compile("/nifi-api/connections/[a-f0-9\\-]{36}/listing-requests/[a-f0-9\\-]{36}");
+    public static final Pattern REPORTING_TASK_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
+    public static final Pattern COUNTER_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}");
 
     private final NiFiProperties properties;
-    private final HttpRequestReplicator httpRequestReplicator;
-    private final HttpResponseMapper httpResponseMapper;
     private final DataFlowManagementService dataFlowManagementService;
     private final ClusterManagerProtocolSenderListener senderListener;
     private final OptimisticLockingManager optimisticLockingManager;
@@ -404,18 +276,15 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     private final FlowEngine reportingTaskEngine;
     private final StandardProcessScheduler processScheduler;
     private final StateManagerProvider stateManagerProvider;
-    private final long componentStatusSnapshotMillis;
 
+    private final HttpResponseMerger responseMerger = new 
StandardHttpResponseMerger(this);
+    private final RequestReplicator httpRequestReplicator;
 
-    public WebClusterManager(final HttpRequestReplicator 
httpRequestReplicator, final HttpResponseMapper httpResponseMapper,
+    public WebClusterManager(
             final DataFlowManagementService dataFlowManagementService, final 
ClusterManagerProtocolSenderListener senderListener,
             final NiFiProperties properties, final StringEncryptor encryptor, 
final OptimisticLockingManager optimisticLockingManager) {
 
-        if (httpRequestReplicator == null) {
-            throw new IllegalArgumentException("HttpRequestReplicator may not 
be null.");
-        } else if (httpResponseMapper == null) {
-            throw new IllegalArgumentException("HttpResponseMapper may not be 
null.");
-        } else if (dataFlowManagementService == null) {
+        if (dataFlowManagementService == null) {
             throw new IllegalArgumentException("DataFlowManagementService may 
not be null.");
         } else if (senderListener == null) {
             throw new 
IllegalArgumentException("ClusterManagerProtocolSenderListener may not be 
null.");
@@ -423,9 +292,6 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             throw new IllegalArgumentException("NiFiProperties may not be 
null.");
         }
 
-        // Ensure that our encryptor/decryptor is properly initialized
-        this.httpRequestReplicator = httpRequestReplicator;
-        this.httpResponseMapper = httpResponseMapper;
         this.dataFlowManagementService = dataFlowManagementService;
         this.properties = properties;
         this.bulletinRepository = new VolatileBulletinRepository();
@@ -436,15 +302,6 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         senderListener.addHandler(this);
         senderListener.setBulletinRepository(bulletinRepository);
 
-        final String snapshotFrequency = 
properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, 
NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY);
-        long snapshotMillis;
-        try {
-            snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, 
TimeUnit.MILLISECONDS);
-        } catch (final Exception e) {
-            snapshotMillis = 
FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY,
 TimeUnit.MILLISECONDS);
-        }
-        componentStatusSnapshotMillis = snapshotMillis;
-
         remoteInputPort = properties.getRemoteInputPort();
         if (remoteInputPort == null) {
             remoteSiteListener = null;
@@ -485,9 +342,33 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
 
         controllerServiceProvider = new 
StandardControllerServiceProvider(processScheduler, bulletinRepository, 
stateManagerProvider);
 
-        clusterCoordinator = new WebClusterManagerCoordinator(this, 
senderListener);
+        clusterCoordinator = new WebClusterManagerCoordinator(this, 
senderListener, dataFlowManagementService);
         heartbeatMonitor = new 
ClusterProtocolHeartbeatMonitor(clusterCoordinator, properties);
         senderListener.addHandler(heartbeatMonitor);
+        httpRequestReplicator = createRequestReplicator(properties);
+    }
+
+    private RequestReplicator createRequestReplicator(final NiFiProperties 
properties) {
+        final int numThreads = 
properties.getClusterManagerNodeApiRequestThreads();
+        final String connectionTimeout = 
properties.getClusterManagerNodeApiConnectionTimeout();
+        final String readTimeout = 
properties.getClusterManagerNodeApiReadTimeout();
+        final EventReporter eventReporter = createEventReporter();
+
+        final Client jerseyClient = WebUtils.createClient(new 
DefaultClientConfig(), SslContextFactory.createSslContext(properties));
+        return new ThreadPoolRequestReplicator(numThreads, jerseyClient, 
clusterCoordinator, connectionTimeout, readTimeout, this,
+            eventReporter, this, optimisticLockingManager, 
dataFlowManagementService);
+    }
+
+    private EventReporter createEventReporter() {
+        return new EventReporter() {
+            private static final long serialVersionUID = 7770887158588031619L;
+
+            @Override
+            public void reportEvent(Severity severity, String category, String 
message) {
+                final Bulletin bulletin = 
BulletinFactory.createBulletin(category, severity.name(), message);
+                getBulletinRepository().addBulletin(bulletin);
+            }
+        };
     }
 
     public void start() throws IOException {
@@ -1335,23 +1216,23 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     }
 
     @Override
-    public void disableReferencingServices(final ControllerServiceNode 
serviceNode) {
-        controllerServiceProvider.disableReferencingServices(serviceNode);
+    public Set<ConfiguredComponent> disableReferencingServices(final 
ControllerServiceNode serviceNode) {
+        return 
controllerServiceProvider.disableReferencingServices(serviceNode);
     }
 
     @Override
-    public void enableReferencingServices(final ControllerServiceNode 
serviceNode) {
-        controllerServiceProvider.enableReferencingServices(serviceNode);
+    public Set<ConfiguredComponent> enableReferencingServices(final 
ControllerServiceNode serviceNode) {
+        return 
controllerServiceProvider.enableReferencingServices(serviceNode);
     }
 
     @Override
-    public void scheduleReferencingComponents(final ControllerServiceNode 
serviceNode) {
-        controllerServiceProvider.scheduleReferencingComponents(serviceNode);
+    public Set<ConfiguredComponent> scheduleReferencingComponents(final 
ControllerServiceNode serviceNode) {
+        return 
controllerServiceProvider.scheduleReferencingComponents(serviceNode);
     }
 
     @Override
-    public void unscheduleReferencingComponents(final ControllerServiceNode 
serviceNode) {
-        controllerServiceProvider.unscheduleReferencingComponents(serviceNode);
+    public Set<ConfiguredComponent> unscheduleReferencingComponents(final 
ControllerServiceNode serviceNode) {
+        return 
controllerServiceProvider.unscheduleReferencingComponents(serviceNode);
     }
 
     @Override
@@ -1676,18 +1557,6 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
 
         lock.lock();
         try {
-            // check that the request can be applied
-            if (mutableRequest) {
-                if (isInSafeMode()) {
-                    throw new SafeModeMutableRequestException("Received a 
mutable request [" + method + " -- " + uri + "] while in safe mode");
-                } else if (!getNodeIds(Status.DISCONNECTED, 
Status.DISCONNECTING).isEmpty()) {
-                    throw new 
DisconnectedNodeMutableRequestException("Received a mutable request [" + method 
+ " -- " + uri + "] while a node is disconnected from the cluster");
-                } else if (!getNodeIds(Status.CONNECTING).isEmpty()) {
-                    // if any node is connecting and a request can change the 
flow, then we throw an exception
-                    throw new ConnectingNodeMutableRequestException("Received 
a mutable request [" + method + " -- " + uri + "] while a node is trying to 
connect to the cluster");
-                }
-            }
-
             final NodeResponse clientResponse = federateRequest(method, uri, 
parameters, null, headers, nodeIdentifiers);
             if (clientResponse == null) {
                 if (mutableRequest) {
@@ -1913,55 +1782,20 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                 final String serializedClusterCtx = 
WebUtils.serializeObjectToHex(clusterCtx);
                 updatedHeaders.put(CLUSTER_CONTEXT_HTTP_HEADER, 
serializedClusterCtx);
 
-                // if the request is mutable, we need to verify that it is a 
valid request for all nodes in the cluster.
-                if (mutableRequest) {
-                    updatedHeaders.put(NCM_EXPECTS_HTTP_HEADER, 
"150-NodeContinue");
-
-                    final Set<NodeResponse> nodeResponses;
-                    if (entity == null) {
-                        nodeResponses = 
httpRequestReplicator.replicate(nodeIds, method, uri, parameters, 
updatedHeaders);
-                    } else {
-                        nodeResponses = 
httpRequestReplicator.replicate(nodeIds, method, uri, entity, updatedHeaders);
-                    }
-
-                    updatedHeaders.remove(NCM_EXPECTS_HTTP_HEADER);
-
-                    for (final NodeResponse response : nodeResponses) {
-                        if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) 
{
-                            final String nodeDescription = 
response.getNodeId().getApiAddress() + ":" + response.getNodeId().getApiPort();
-                            final ClientResponse clientResponse = 
response.getClientResponse();
-                            if (clientResponse == null) {
-                                throw new IllegalClusterStateException("Node " 
+ nodeDescription + " is unable to fulfill this request due to: Unexpected 
Response Code " + response.getStatus());
-                            }
-                            final String nodeExplanation = 
clientResponse.getEntity(String.class);
-                            throw new IllegalClusterStateException("Node " + 
nodeDescription + " is unable to fulfill this request due to: " + 
nodeExplanation, response.getThrowable());
-                        }
-                    }
-
-                    // set flow state to unknown to denote a mutable request 
replication in progress
-                    logger.debug("Setting Flow State to UNKNOWN due to mutable 
request to {} {}", method, uri);
-                    
notifyDataFlowManagmentServiceOfFlowStateChange(PersistedFlowState.UNKNOWN);
-                }
-
                 // replicate request
-                final Set<NodeResponse> nodeResponses;
-                try {
-                    if (entity == null) {
-                        nodeResponses = 
httpRequestReplicator.replicate(nodeIds, method, uri, parameters, 
updatedHeaders);
-                    } else {
-                        nodeResponses = 
httpRequestReplicator.replicate(nodeIds, method, uri, entity, updatedHeaders);
-                    }
-                } catch (final UriConstructionException uce) {
-                    // request was not replicated, so mark the flow with its 
original state
-                    if (mutableRequest) {
-                        
notifyDataFlowManagmentServiceOfFlowStateChange(originalPersistedFlowState);
-                    }
+                final AsyncClusterResponse clusterResponse = 
httpRequestReplicator.replicate(nodeIds, method, uri, entity == null ? 
parameters : entity, updatedHeaders);
 
-                    throw uce;
+                final NodeResponse clientResponse;
+                try {
+                    clientResponse = clusterResponse.awaitMergedResponse();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    logger.warn("Thread was interrupted while waiting for a 
response from one or more nodes", e);
+                    final Set<NodeIdentifier> noResponses = 
clusterResponse.getNodesInvolved();
+                    
noResponses.removeAll(clusterResponse.getCompletedNodeIdentifiers());
+                    throw new IllegalClusterStateException("Interrupted while 
waiting for a response from the following nodes: " + noResponses, e);
                 }
 
-                // merge the response
-                final NodeResponse clientResponse = mergeResponses(uri, 
method, nodeResponses, mutableRequest);
                 holder.set(clientResponse);
 
                 // if we have a response get the updated cluster context for 
auditing and revision updating
@@ -2009,1973 +1843,240 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         return holder.get();
     }
 
-    private static boolean isProcessorsEndpoint(final URI uri, final String 
method) {
-        return "GET".equalsIgnoreCase(method) && 
PROCESSORS_URI_PATTERN.matcher(uri.getPath()).matches();
-    }
-
-    private static boolean isProcessorEndpoint(final URI uri, final String 
method) {
-        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method))
-                && (PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches() || 
CLUSTER_PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches())) {
-            return true;
-        } else if ("POST".equalsIgnoreCase(method) && 
PROCESSORS_URI_PATTERN.matcher(uri.getPath()).matches()) {
-            return true;
-        }
 
-        return false;
+    private static boolean isCounterEndpoint(final String uriPath) {
+        return COUNTER_URI_PATTERN.matcher(uriPath).matches();
     }
 
-    private static boolean isProcessorStatusEndpoint(final URI uri, final 
String method) {
-        return "GET".equalsIgnoreCase(method) && 
PROCESSOR_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
-    }
 
-    private static boolean isProcessorStateEndpoint(final URI uri, final 
String method) {
-        return "GET".equalsIgnoreCase(method) && 
PROCESSOR_STATE_URI_PATTERN.matcher(uri.getPath()).matches();
-    }
+    public List<BulletinDTO> mergeNCMBulletins(final List<BulletinDTO> 
nodeBulletins, final List<Bulletin> ncmBulletins) {
+        if (ncmBulletins == null || ncmBulletins.isEmpty()) {
+            return nodeBulletins;
+        }
 
-    private static boolean isProcessGroupEndpoint(final URI uri, final String 
method) {
-        return ("GET".equalsIgnoreCase(method) || 
"PUT".equalsIgnoreCase(method)) && 
PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches();
+        final List<BulletinDTO> mergedBulletins = new 
ArrayList<>(nodeBulletins.size() + ncmBulletins.size());
+        mergedBulletins.addAll(nodeBulletins);
+        mergedBulletins.addAll(createBulletinDtos(ncmBulletins));
+        return mergedBulletins;
     }
 
-    private static boolean isConnectionStatusEndpoint(final URI uri, final 
String method) {
-        return "GET".equalsIgnoreCase(method) && 
CONNECTION_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
-    }
 
-    private static boolean isInputPortStatusEndpoint(final URI uri, final 
String method) {
-        return "GET".equalsIgnoreCase(method) && 
INPUT_PORT_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    /**
+     * Creates BulletinDTOs for the specified Bulletins.
+     *
+     * @param bulletins bulletin
+     * @return dto
+     */
+    public List<BulletinDTO> createBulletinDtos(final List<Bulletin> 
bulletins) {
+        final List<BulletinDTO> bulletinDtos = new 
ArrayList<>(bulletins.size());
+        for (final Bulletin bulletin : bulletins) {
+            bulletinDtos.add(createBulletinDto(bulletin));
+        }
+        return bulletinDtos;
     }
 
-    private static boolean isOutputPortStatusEndpoint(final URI uri, final 
String method) {
-        return "GET".equalsIgnoreCase(method) && 
OUTPUT_PORT_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    /**
+     * Creates a BulletinDTO for the specified Bulletin.
+     *
+     * @param bulletin bulletin
+     * @return dto
+     */
+    public BulletinDTO createBulletinDto(final Bulletin bulletin) {
+        final BulletinDTO dto = new BulletinDTO();
+        dto.setId(bulletin.getId());
+        dto.setNodeAddress(bulletin.getNodeAddress());
+        dto.setTimestamp(bulletin.getTimestamp());
+        dto.setGroupId(bulletin.getGroupId());
+        dto.setSourceId(bulletin.getSourceId());
+        dto.setSourceName(bulletin.getSourceName());
+        dto.setCategory(bulletin.getCategory());
+        dto.setLevel(bulletin.getLevel());
+        dto.setMessage(bulletin.getMessage());
+        return dto;
     }
 
-    private static boolean isRemoteProcessGroupStatusEndpoint(final URI uri, 
final String method) {
-        return "GET".equalsIgnoreCase(method) && 
REMOTE_PROCESS_GROUP_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
-    }
 
-    private static boolean isGroupStatusEndpoint(final URI uri, final String 
method) {
-        return "GET".equalsIgnoreCase(method) && 
GROUP_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    /**
+     * Merges the validation errors into the specified map, recording the 
corresponding node identifier.
+     *
+     * @param validationErrorMap map
+     * @param nodeId id
+     * @param nodeValidationErrors errors
+     */
+    public void mergeValidationErrors(final Map<String, Set<NodeIdentifier>> 
validationErrorMap, final NodeIdentifier nodeId, final Collection<String> 
nodeValidationErrors) {
+        if (nodeValidationErrors != null) {
+            for (final String nodeValidationError : nodeValidationErrors) {
+                Set<NodeIdentifier> nodeSet = 
validationErrorMap.get(nodeValidationError);
+                if (nodeSet == null) {
+                    nodeSet = new HashSet<>();
+                    validationErrorMap.put(nodeValidationError, nodeSet);
+                }
+                nodeSet.add(nodeId);
+            }
+        }
     }
 
-    private static boolean isControllerStatusEndpoint(final URI uri, final 
String method) {
-        return "GET".equalsIgnoreCase(method) && 
CONTROLLER_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
-    }
+    /**
+     * Normalizes the validation errors by prepending the corresponding nodes 
when the error does not exist across all nodes.
+     *
+     * @param validationErrorMap map
+     * @param totalNodes total
+     * @return normalized errors
+     */
+    public Set<String> normalizedMergedValidationErrors(final Map<String, 
Set<NodeIdentifier>> validationErrorMap, int totalNodes) {
+        final Set<String> normalizedValidationErrors = new HashSet<>();
+        for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : 
validationErrorMap.entrySet()) {
+            final String msg = validationEntry.getKey();
+            final Set<NodeIdentifier> nodeIds = validationEntry.getValue();
 
-    private static boolean isTemplateEndpoint(final URI uri, final String 
method) {
-        return "POST".equalsIgnoreCase(method) && 
TEMPLATE_INSTANCE_URI_PATTERN.matcher(uri.getPath()).matches();
+            if (nodeIds.size() == totalNodes) {
+                normalizedValidationErrors.add(msg);
+            } else {
+                for (final NodeIdentifier nodeId : nodeIds) {
+                    normalizedValidationErrors.add(nodeId.getApiAddress() + 
":" + nodeId.getApiPort() + " -- " + msg);
+                }
+            }
+        }
+        return normalizedValidationErrors;
     }
 
-    private static boolean isFlowSnippetEndpoint(final URI uri, final String 
method) {
-        return "POST".equalsIgnoreCase(method) && 
FLOW_SNIPPET_INSTANCE_URI_PATTERN.matcher(uri.getPath()).matches();
-    }
 
-    private static boolean isRemoteProcessGroupsEndpoint(final URI uri, final 
String method) {
-        return "GET".equalsIgnoreCase(method) && 
REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches();
-    }
+    // TODO: This is temporary. Only here because while we have NCM, we must 
merge its bulletins. Once we get rid
+    // of the NCM, this goes away completely.
+    public static final Pattern CONTROLLER_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/status");
 
-    private static boolean isProcessorStatusHistoryEndpoint(final URI uri, 
final String method) {
-        return "GET".equalsIgnoreCase(method) && 
PROCESSOR_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches();
+    public boolean isControllerStatusEndpoint(final URI uri, final String 
method) {
+        return "GET".equalsIgnoreCase(method) && 
CONTROLLER_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
     }
 
-    private static boolean isProcessGroupStatusHistoryEndpoint(final URI uri, 
final String method) {
-        return "GET".equalsIgnoreCase(method) && 
PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches();
-    }
 
-    private static boolean isRemoteProcessGroupStatusHistoryEndpoint(final URI 
uri, final String method) {
-        return "GET".equalsIgnoreCase(method) && 
REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches();
-    }
+    @Override
+    public void afterRequest(final String uriPath, final String method, final 
Set<NodeResponse> nodeResponses) {
+        final boolean mutableRequest = canChangeNodeState(method, null);
 
-    private static boolean isConnectionStatusHistoryEndpoint(final URI uri, 
final String method) {
-        return "GET".equalsIgnoreCase(method) && 
CONNECTION_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches();
-    }
+        /*
+         * Nodes that encountered issues handling the request are marked as
+         * disconnected for mutable requests (e.g., post, put, delete).  For
+         * other requests (e.g., get, head), the nodes remain in their current
+         * state even if they had problems handling the request.
+         */
+        if (mutableRequest) {
+            final Set<NodeResponse> problematicNodeResponses = 
responseMerger.getProblematicNodeResponses(nodeResponses);
 
-    private static boolean isBulletinBoardEndpoint(final URI uri, final String 
method) {
-        return "GET".equalsIgnoreCase(method) && 
BULLETIN_BOARD_URI_PATTERN.matcher(uri.getPath()).matches();
-    }
+            // all nodes failed
+            final boolean allNodesFailed = problematicNodeResponses.size() == 
nodeResponses.size();
 
-    private static boolean isSystemDiagnosticsEndpoint(final URI uri, final 
String method) {
-        return "GET".equalsIgnoreCase(method) && 
SYSTEM_DIAGNOSTICS_URI_PATTERN.matcher(uri.getPath()).matches();
-    }
+            // some nodes had a problematic response because of a missing 
counter, ensure the are not disconnected
+            final boolean someNodesFailedMissingCounter = 
!problematicNodeResponses.isEmpty()
+                && problematicNodeResponses.size() < nodeResponses.size() && 
isMissingCounter(problematicNodeResponses, uriPath);
 
-    private static boolean isCountersEndpoint(final URI uri, final String 
method) {
-        return "GET".equalsIgnoreCase(method) && 
COUNTERS_URI_PATTERN.matcher(uri.getPath()).matches();
-    }
+            // ensure nodes stay connected in certain scenarios
+            if (allNodesFailed || someNodesFailedMissingCounter) {
+                for (final NodeResponse nodeResponse : nodeResponses) {
+                    final Node node = 
getRawNode(nodeResponse.getNodeId().getId());
 
+                    if (problematicNodeResponses.contains(nodeResponse)) {
+                        node.setStatus(Status.CONNECTED);
+                        problematicNodeResponses.remove(nodeResponse);
+                    }
+                }
+            }
 
-    private static boolean isRemoteProcessGroupEndpoint(final URI uri, final 
String method) {
-        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) 
&& REMOTE_PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches()) {
-            return true;
-        } else if ("POST".equalsIgnoreCase(method) && 
REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches()) {
-            return true;
+            // disconnect problematic nodes
+            if (!problematicNodeResponses.isEmpty()) {
+                if (problematicNodeResponses.size() < nodeResponses.size()) {
+                    logger.warn(String.format("The following nodes failed to 
process URI '%s'.  Requesting each node to disconnect from cluster: ", uriPath, 
problematicNodeResponses));
+                    disconnectNodes(problematicNodeResponses, "Failed to 
process URI " + uriPath);
+                } else {
+                    logger.warn("All nodes failed to process URI {}. As a 
result, no node will be disconnected from cluster", uriPath);
+                }
+            }
         }
-
-        return false;
     }
 
-    private static boolean isProvenanceQueryEndpoint(final URI uri, final 
String method) {
-        if ("POST".equalsIgnoreCase(method) && 
PROVENANCE_URI.equals(uri.getPath())) {
-            return true;
-        } else if ("GET".equalsIgnoreCase(method) && 
PROVENANCE_QUERY_URI.matcher(uri.getPath()).matches()) {
-            return true;
+
+    /**
+     * Determines if all problematic responses were due to 404 NOT_FOUND. 
Assumes that problematicNodeResponses is not empty and is not comprised of 
responses from all nodes in the cluster (at least
+     * one node contained the counter in question).
+     *
+     * @param problematicNodeResponses The problematic node responses
+     * @param uriPath The path of the URI for the request
+     * @return Whether all problematic node responses were due to a missing 
counter
+     */
+    private boolean isMissingCounter(final Set<NodeResponse> 
problematicNodeResponses, final String uriPath) {
+        if (isCounterEndpoint(uriPath)) {
+            boolean notFound = true;
+            for (final NodeResponse problematicResponse : 
problematicNodeResponses) {
+                if (problematicResponse.getStatus() != 404) {
+                    notFound = false;
+                    break;
+                }
+            }
+            return notFound;
         }
         return false;
     }
 
-    private static boolean isProvenanceEventEndpoint(final URI uri, final 
String method) {
-        return "GET".equalsIgnoreCase(method) && 
PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches();
-    }
 
-    private static boolean isListFlowFilesEndpoint(final URI uri, final String 
method) {
-        if (("GET".equalsIgnoreCase(method) || 
"DELETE".equalsIgnoreCase(method)) && 
LISTING_REQUEST_URI.matcher(uri.getPath()).matches()) {
-            return true;
-        } else if ("POST".equalsIgnoreCase(method) && 
LISTING_REQUESTS_URI.matcher(uri.getPath()).matches()) {
-            return true;
+    /**
+     * A helper method to disconnect nodes that returned unsuccessful HTTP 
responses because of a replicated request. Disconnection requests are sent 
concurrently.
+     *
+     */
+    private void disconnectNodes(final Set<NodeResponse> nodeResponses, final 
String explanation) {
+        // return fast if nothing to do
+        if (nodeResponses == null || nodeResponses.isEmpty()) {
+            return;
         }
 
-        return false;
-    }
-
-    private static boolean isCounterEndpoint(final URI uri) {
-        return COUNTER_URI_PATTERN.matcher(uri.getPath()).matches();
-    }
+        final ExecutorService executorService = 
Executors.newFixedThreadPool(properties.getClusterManagerProtocolThreads());
+        final CompletionService<Void> completionService = new 
ExecutorCompletionService<>(executorService);
+        for (final NodeResponse nodeResponse : nodeResponses) {
+            completionService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    final NodeIdentifier nodeId = nodeResponse.getNodeId();
+                    final int responseStatus = nodeResponse.getStatus();
+                    final URI requestUri = nodeResponse.getRequestUri();
+                    final StringBuilder msgBuilder = new StringBuilder();
+                    msgBuilder
+                            .append("Requesting disconnection for node ")
+                            .append(nodeId)
+                            .append(" for request URI ")
+                            .append(requestUri);
+                    if (nodeResponse.hasThrowable()) {
+                        msgBuilder.append(" because manager encountered 
exception when issuing request: ")
+                                .append(nodeResponse.getThrowable());
+                        // log stack trace anytime we have a throwable
+                        ((NiFiLog) 
logger).getWrappedLog().info(msgBuilder.toString(), 
nodeResponse.getThrowable());
+                        addEvent(nodeId, "Manager encountered exception when 
issuing request for URI " + requestUri);
+                        addBulletin(nodeId, Severity.ERROR, "Manager 
encountered exception when issuing request for URI " + requestUri + "; node 
will be disconnected");
+                    } else {
+                        msgBuilder.append(" because HTTP response status was ")
+                                .append(responseStatus);
+                        logger.info(msgBuilder.toString());
+                        addEvent(nodeId, "HTTP response status was 
unsuccessful (" + responseStatus + ") for request URI " + requestUri);
+                        addBulletin(nodeId, Severity.ERROR, "HTTP response 
status was unsuccessful (" + responseStatus + ") for request URI " + 
requestUri);
+                    }
+                    requestDisconnectionQuietly(nodeId, explanation);
+                }
+            }, null);
+        }
 
-    private static boolean isControllerServicesEndpoint(final URI uri, final 
String method) {
-        return "GET".equalsIgnoreCase(method) && 
CONTROLLER_SERVICES_URI.matcher(uri.getPath()).matches();
+        executorService.shutdown();
     }
 
-    private static boolean isControllerServiceEndpoint(final URI uri, final 
String method) {
-        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) 
&& CONTROLLER_SERVICE_URI_PATTERN.matcher(uri.getPath()).matches()) {
-            return true;
-        } else if ("POST".equalsIgnoreCase(method) && 
CONTROLLER_SERVICES_URI.matcher(uri.getPath()).matches()) {
-            return true;
-        }
-
-        return false;
-    }
-
-    private static boolean isControllerServiceStateEndpoint(final URI uri, 
final String method) {
-        return "GET".equalsIgnoreCase(method) && 
CONTROLLER_SERVICE_STATE_URI_PATTERN.matcher(uri.getPath()).matches();
-    }
-
-    private static boolean isControllerServiceReferenceEndpoint(final URI uri, 
final String method) {
-        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) 
&& CONTROLLER_SERVICE_REFERENCES_URI_PATTERN.matcher(uri.getPath()).matches()) {
-            return true;
-        }
-
-        return false;
-    }
-
-    private static boolean isReportingTasksEndpoint(final URI uri, final 
String method) {
-        return "GET".equalsIgnoreCase(method) && 
REPORTING_TASKS_URI.equals(uri.getPath());
-    }
-
-    private static boolean isReportingTaskEndpoint(final URI uri, final String 
method) {
-        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) 
&& REPORTING_TASK_URI_PATTERN.matcher(uri.getPath()).matches()) {
-            return true;
-        } else if ("POST".equalsIgnoreCase(method) && 
REPORTING_TASKS_URI.equals(uri.getPath())) {
-            return true;
-        }
-
-        return false;
-    }
-
-    private static boolean isReportingTaskStateEndpoint(final URI uri, final 
String method) {
-        return "GET".equalsIgnoreCase(method) && 
REPORTING_TASK_STATE_URI_PATTERN.matcher(uri.getPath()).matches();
-    }
-
-    private static boolean isDropRequestEndpoint(final URI uri, final String 
method) {
-        if (("GET".equalsIgnoreCase(method) || 
"DELETE".equalsIgnoreCase(method)) && 
DROP_REQUEST_URI.matcher(uri.getPath()).matches()) {
-            return true;
-        } else if (("POST".equalsIgnoreCase(method) && 
DROP_REQUESTS_URI.matcher(uri.getPath()).matches())) {
-            return true;
-        }
-
-        return false;
-    }
-
-    static boolean isResponseInterpreted(final URI uri, final String method) {
-        return isProcessorsEndpoint(uri, method) || isProcessorEndpoint(uri, 
method) || isProcessorStateEndpoint(uri, method)
-                || isRemoteProcessGroupsEndpoint(uri, method) || 
isRemoteProcessGroupEndpoint(uri, method)
-                || isProcessGroupEndpoint(uri, method)
-                || isTemplateEndpoint(uri, method) || 
isFlowSnippetEndpoint(uri, method)
-                || isProvenanceQueryEndpoint(uri, method) || 
isProvenanceEventEndpoint(uri, method)
-                || isControllerServicesEndpoint(uri, method) || 
isControllerServiceEndpoint(uri, method)
-                || isControllerServiceReferenceEndpoint(uri, method) || 
isControllerServiceStateEndpoint(uri, method)
-                || isReportingTasksEndpoint(uri, method) || 
isReportingTaskEndpoint(uri, method) || isReportingTaskStateEndpoint(uri, 
method)
-                || isDropRequestEndpoint(uri, method) || 
isListFlowFilesEndpoint(uri, method)
-                || isGroupStatusEndpoint(uri, method) || 
isProcessorStatusEndpoint(uri, method) || isControllerStatusEndpoint(uri, 
method)
-                || isConnectionStatusEndpoint(uri, method) || 
isRemoteProcessGroupStatusEndpoint(uri, method)
-                || isInputPortStatusEndpoint(uri, method) || 
isOutputPortStatusEndpoint(uri, method)
-                || isProcessorStatusHistoryEndpoint(uri, method) || 
isProcessGroupStatusHistoryEndpoint(uri, method)
-                || isRemoteProcessGroupStatusHistoryEndpoint(uri, method) || 
isConnectionStatusHistoryEndpoint(uri, method)
-                || isBulletinBoardEndpoint(uri, method) || 
isSystemDiagnosticsEndpoint(uri, method)
-                || isCountersEndpoint(uri, method);
-    }
-
-    private void mergeProcessorValidationErrors(final ProcessorDTO processor, 
Map<NodeIdentifier, ProcessorDTO> processorMap) {
-        final Map<String, Set<NodeIdentifier>> validationErrorMap = new 
HashMap<>();
-
-        for (final Map.Entry<NodeIdentifier, ProcessorDTO> nodeEntry : 
processorMap.entrySet()) {
-            final NodeIdentifier nodeId = nodeEntry.getKey();
-            final ProcessorDTO nodeProcessor = nodeEntry.getValue();
-
-            // merge the validation errors
-            mergeValidationErrors(validationErrorMap, nodeId, 
nodeProcessor.getValidationErrors());
-        }
-
-        // set the merged the validation errors
-        
processor.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap,
 processorMap.size()));
-    }
-
-    private void mergeComponentState(final ComponentStateDTO componentState, 
Map<NodeIdentifier, ComponentStateDTO> componentStateMap) {
-        List<StateEntryDTO> localStateEntries = new ArrayList<>();
-
-        int totalStateEntries = 0;
-        for (final Map.Entry<NodeIdentifier, ComponentStateDTO> nodeEntry : 
componentStateMap.entrySet()) {
-            final ComponentStateDTO nodeComponentState = nodeEntry.getValue();
-            final NodeIdentifier nodeId = nodeEntry.getKey();
-            final String nodeAddress = nodeId.getApiAddress() + ":" + 
nodeId.getApiPort();
-
-            final StateMapDTO nodeLocalStateMap = 
nodeComponentState.getLocalState();
-            if (nodeLocalStateMap.getState() != null) {
-                totalStateEntries += nodeLocalStateMap.getTotalEntryCount();
-
-                for (final StateEntryDTO nodeStateEntry : 
nodeLocalStateMap.getState()) {
-                    nodeStateEntry.setClusterNodeId(nodeId.getId());
-                    nodeStateEntry.setClusterNodeAddress(nodeAddress);
-                    localStateEntries.add(nodeStateEntry);
-                }
-            }
-        }
-
-        // ensure appropriate sort
-        Collections.sort(localStateEntries, 
SortedStateUtils.getEntryDtoComparator());
-
-        // sublist if necessary
-        if (localStateEntries.size() > 
SortedStateUtils.MAX_COMPONENT_STATE_ENTRIES) {
-            localStateEntries = localStateEntries.subList(0, 
SortedStateUtils.MAX_COMPONENT_STATE_ENTRIES);
-        }
-
-        // add all the local state entries
-        componentState.getLocalState().setTotalEntryCount(totalStateEntries);
-        componentState.getLocalState().setState(localStateEntries);
-    }
-
-
-    private void mergeSystemDiagnostics(final SystemDiagnosticsDTO target, 
final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, 
SystemDiagnosticsDTO> resultMap) {
-        final SystemDiagnosticsDTO mergedSystemDiagnostics = target;
-        mergedSystemDiagnostics.setNodeSnapshots(new 
ArrayList<NodeSystemDiagnosticsSnapshotDTO>());
-
-        final NodeSystemDiagnosticsSnapshotDTO selectedNodeSnapshot = new 
NodeSystemDiagnosticsSnapshotDTO();
-        
selectedNodeSnapshot.setSnapshot(target.getAggregateSnapshot().clone());
-        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
-        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
-        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
-
-        mergedSystemDiagnostics.getNodeSnapshots().add(selectedNodeSnapshot);
-
-        for (final Map.Entry<NodeIdentifier, SystemDiagnosticsDTO> entry : 
resultMap.entrySet()) {
-            final NodeIdentifier nodeId = entry.getKey();
-            final SystemDiagnosticsDTO toMerge = entry.getValue();
-            if (toMerge == target) {
-                continue;
-            }
-
-            StatusMerger.merge(mergedSystemDiagnostics, toMerge, 
nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
-        }
-    }
-
-    private void mergeCounters(final CountersDTO target, final NodeIdentifier 
selectedNodeId, final Map<NodeIdentifier, CountersDTO> resultMap) {
-        final CountersDTO mergedCounters = target;
-        mergedCounters.setNodeSnapshots(new 
ArrayList<NodeCountersSnapshotDTO>());
-
-        final NodeCountersSnapshotDTO selectedNodeSnapshot = new 
NodeCountersSnapshotDTO();
-        
selectedNodeSnapshot.setSnapshot(target.getAggregateSnapshot().clone());
-        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
-        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
-        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
-
-        mergedCounters.getNodeSnapshots().add(selectedNodeSnapshot);
-
-        for (final Map.Entry<NodeIdentifier, CountersDTO> entry : 
resultMap.entrySet()) {
-            final NodeIdentifier nodeId = entry.getKey();
-            final CountersDTO toMerge = entry.getValue();
-            if (toMerge == target) {
-                continue;
-            }
-
-            StatusMerger.merge(mergedCounters, toMerge, nodeId.getId(), 
nodeId.getApiAddress(), nodeId.getApiPort());
-        }
-    }
-
-    private void mergeGroupStatus(final ProcessGroupStatusDTO statusDto, final 
NodeIdentifier selectedNodeId, final Map<NodeIdentifier, ProcessGroupStatusDTO> 
resultMap) {
-        final ProcessGroupStatusDTO mergedProcessGroupStatus = statusDto;
-        mergedProcessGroupStatus.setNodeSnapshots(new 
ArrayList<NodeProcessGroupStatusSnapshotDTO>());
-
-        final NodeProcessGroupStatusSnapshotDTO selectedNodeSnapshot = new 
NodeProcessGroupStatusSnapshotDTO();
-        
selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone());
-        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
-        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
-        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
-
-        mergedProcessGroupStatus.getNodeSnapshots().add(selectedNodeSnapshot);
-
-        for (final Map.Entry<NodeIdentifier, ProcessGroupStatusDTO> entry : 
resultMap.entrySet()) {
-            final NodeIdentifier nodeId = entry.getKey();
-            final ProcessGroupStatusDTO nodeProcessGroupStatus = 
entry.getValue();
-            if (nodeProcessGroupStatus == mergedProcessGroupStatus) {
-                continue;
-            }
-
-            final ProcessGroupStatusSnapshotDTO nodeSnapshot = 
nodeProcessGroupStatus.getAggregateSnapshot();
-            for (final RemoteProcessGroupStatusSnapshotDTO 
remoteProcessGroupStatus : nodeSnapshot.getRemoteProcessGroupStatusSnapshots()) 
{
-                final List<String> nodeAuthorizationIssues = 
remoteProcessGroupStatus.getAuthorizationIssues();
-                if (!nodeAuthorizationIssues.isEmpty()) {
-                    for (final ListIterator<String> iter = 
nodeAuthorizationIssues.listIterator(); iter.hasNext();) {
-                        final String Issue = iter.next();
-                        iter.set("[" + nodeId.getApiAddress() + ":" + 
nodeId.getApiPort() + "] -- " + Issue);
-                    }
-                    
remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues);
-                }
-            }
-
-            StatusMerger.merge(mergedProcessGroupStatus, 
nodeProcessGroupStatus, nodeId.getId(), nodeId.getApiAddress(), 
nodeId.getApiPort());
-        }
-    }
-
-
-    private void mergeProcessorStatus(final ProcessorStatusDTO statusDto, 
final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, 
ProcessorStatusDTO> resultMap) {
-        final ProcessorStatusDTO mergedProcessorStatus = statusDto;
-        mergedProcessorStatus.setNodeSnapshots(new 
ArrayList<NodeProcessorStatusSnapshotDTO>());
-
-        final NodeProcessorStatusSnapshotDTO selectedNodeSnapshot = new 
NodeProcessorStatusSnapshotDTO();
-        
selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone());
-        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
-        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
-        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
-
-        mergedProcessorStatus.getNodeSnapshots().add(selectedNodeSnapshot);
-
-        // merge the other nodes
-        for (final Map.Entry<NodeIdentifier, ProcessorStatusDTO> entry : 
resultMap.entrySet()) {
-            final NodeIdentifier nodeId = entry.getKey();
-            final ProcessorStatusDTO nodeProcessorStatus = entry.getValue();
-            if (nodeProcessorStatus == statusDto) {
-                continue;
-            }
-
-            StatusMerger.merge(mergedProcessorStatus, nodeProcessorStatus, 
nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
-        }
-    }
-
-    private void mergeConnectionStatus(final ConnectionStatusDTO statusDto, 
final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, 
ConnectionStatusDTO> resultMap) {
-        final ConnectionStatusDTO mergedConnectionStatus = statusDto;
-        mergedConnectionStatus.setNodeSnapshots(new 
ArrayList<NodeConnectionStatusSnapshotDTO>());
-
-        final NodeConnectionStatusSnapshotDTO selectedNodeSnapshot = new 
NodeConnectionStatusSnapshotDTO();
-        
selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone());
-        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
-        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
-        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
-
-        mergedConnectionStatus.getNodeSnapshots().add(selectedNodeSnapshot);
-
-        // merge the other nodes
-        for (final Map.Entry<NodeIdentifier, ConnectionStatusDTO> entry : 
resultMap.entrySet()) {
-            final NodeIdentifier nodeId = entry.getKey();
-            final ConnectionStatusDTO nodeConnectionStatus = entry.getValue();
-            if (nodeConnectionStatus == statusDto) {
-                continue;
-            }
-
-            StatusMerger.merge(mergedConnectionStatus, nodeConnectionStatus, 
nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
-        }
-    }
-
-    private void mergePortStatus(final PortStatusDTO statusDto, final 
NodeIdentifier selectedNodeId, final Map<NodeIdentifier, PortStatusDTO> 
resultMap) {
-        final PortStatusDTO mergedPortStatus = statusDto;
-        mergedPortStatus.setNodeSnapshots(new 
ArrayList<NodePortStatusSnapshotDTO>());
-
-        final NodePortStatusSnapshotDTO selectedNodeSnapshot = new 
NodePortStatusSnapshotDTO();
-        
selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone());
-        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
-        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
-        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
-
-        mergedPortStatus.getNodeSnapshots().add(selectedNodeSnapshot);
-
-        // merge the other nodes
-        for (final Map.Entry<NodeIdentifier, PortStatusDTO> entry : 
resultMap.entrySet()) {
-            final NodeIdentifier nodeId = entry.getKey();
-            final PortStatusDTO nodePortStatus = entry.getValue();
-            if (nodePortStatus == statusDto) {
-                continue;
-            }
-
-            StatusMerger.merge(mergedPortStatus, nodePortStatus, 
nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
-        }
-    }
-
-    private void mergeRemoteProcessGroupStatus(final 
RemoteProcessGroupStatusDTO statusDto, final NodeIdentifier selectedNodeId, 
final Map<NodeIdentifier, RemoteProcessGroupStatusDTO> resultMap) {
-        final RemoteProcessGroupStatusDTO mergedRemoteProcessGroupStatus = 
statusDto;
-        mergedRemoteProcessGroupStatus.setNodeSnapshots(new 
ArrayList<NodeRemoteProcessGroupStatusSnapshotDTO>());
-
-        final NodeRemoteProcessGroupStatusSnapshotDTO selectedNodeSnapshot = 
new NodeRemoteProcessGroupStatusSnapshotDTO();
-        
selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone());
-        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
-        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
-        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
-
-        
mergedRemoteProcessGroupStatus.getNodeSnapshots().add(selectedNodeSnapshot);
-
-        // merge the other nodes
-        for (final Map.Entry<NodeIdentifier, RemoteProcessGroupStatusDTO> 
entry : resultMap.entrySet()) {
-            final NodeIdentifier nodeId = entry.getKey();
-            final RemoteProcessGroupStatusDTO nodeRemoteProcessGroupStatus = 
entry.getValue();
-            if (nodeRemoteProcessGroupStatus == statusDto) {
-                continue;
-            }
-
-            StatusMerger.merge(mergedRemoteProcessGroupStatus, 
nodeRemoteProcessGroupStatus, nodeId.getId(), nodeId.getApiAddress(), 
nodeId.getApiPort());
-        }
-    }
-
-    private void mergeControllerStatus(final ControllerStatusDTO statusDto, 
final Map<NodeIdentifier, ControllerStatusDTO> resultMap) {
-        ControllerStatusDTO mergedStatus = statusDto;
-        for (final Map.Entry<NodeIdentifier, ControllerStatusDTO> entry : 
resultMap.entrySet()) {
-            final NodeIdentifier nodeId = entry.getKey();
-            final ControllerStatusDTO nodeStatus = entry.getValue();
-
-            final String nodeAddress = nodeId.getApiAddress() + ":" + 
nodeId.getApiPort();
-            for (final BulletinDTO bulletin : nodeStatus.getBulletins()) {
-                bulletin.setNodeAddress(nodeAddress);
-            }
-            for (final BulletinDTO bulletin : 
nodeStatus.getControllerServiceBulletins()) {
-                bulletin.setNodeAddress(nodeAddress);
-            }
-            for (final BulletinDTO bulletin : 
nodeStatus.getReportingTaskBulletins()) {
-                bulletin.setNodeAddress(nodeAddress);
-            }
-
-            if (nodeStatus == mergedStatus) {
-                continue;
-            }
-
-            StatusMerger.merge(mergedStatus, nodeStatus);
-        }
-
-        final int totalNodeCount = getNodeIds().size();
-        final int connectedNodeCount = getNodeIds(Status.CONNECTED).size();
-
-        final List<Bulletin> ncmControllerBulletins = 
getBulletinRepository().findBulletinsForController();
-        
mergedStatus.setBulletins(mergeNCMBulletins(mergedStatus.getBulletins(), 
ncmControllerBulletins));
-
-        // get the controller service bulletins
-        final BulletinQuery controllerServiceQuery = new 
BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build();
-        final List<Bulletin> ncmServiceBulletins = 
getBulletinRepository().findBulletins(controllerServiceQuery);
-        
mergedStatus.setControllerServiceBulletins(mergeNCMBulletins(mergedStatus.getControllerServiceBulletins(),
 ncmServiceBulletins));
-
-        // get the reporting task bulletins
-        final BulletinQuery reportingTaskQuery = new 
BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build();
-        final List<Bulletin> ncmReportingTaskBulletins = 
getBulletinRepository().findBulletins(reportingTaskQuery);
-        
mergedStatus.setReportingTaskBulletins(mergeNCMBulletins(mergedStatus.getReportingTaskBulletins(),
 ncmReportingTaskBulletins));
-
-        mergedStatus.setConnectedNodeCount(connectedNodeCount);
-        mergedStatus.setTotalNodeCount(totalNodeCount);
-        StatusMerger.updatePrettyPrintedFields(mergedStatus);
-    }
-
-    private List<BulletinDTO> mergeNCMBulletins(final List<BulletinDTO> 
nodeBulletins, final List<Bulletin> ncmBulletins) {
-        if (ncmBulletins == null || ncmBulletins.isEmpty()) {
-            return nodeBulletins;
-        }
-
-        final List<BulletinDTO> mergedBulletins = new 
ArrayList<>(nodeBulletins.size() + ncmBulletins.size());
-        mergedBulletins.addAll(nodeBulletins);
-        mergedBulletins.addAll(createBulletinDtos(ncmBulletins));
-        return mergedBulletins;
-    }
-
-    private void mergeBulletinBoard(final BulletinBoardDTO nodeBulletinBoard, 
final Map<NodeIdentifier, BulletinBoardDTO> resultMap) {
-        final List<BulletinDTO> bulletinDtos = new ArrayList<>();
-        for (final Map.Entry<NodeIdentifier, BulletinBoardDTO> entry : 
resultMap.entrySet()) {
-            final NodeIdentifier nodeId = entry.getKey();
-            final BulletinBoardDTO boardDto = entry.getValue();
-            final String nodeAddress = nodeId.getApiAddress() + ":" + 
nodeId.getApiPort();
-
-            for (final BulletinDTO bulletin : boardDto.getBulletins()) {
-                bulletin.setNodeAddress(nodeAddress);
-                bulletinDtos.add(bulletin);
-            }
-        }
-
-        Collections.sort(bulletinDtos, new Comparator<BulletinDTO>() {
-            @Override
-            public int compare(final BulletinDTO o1, final BulletinDTO o2) {
-                final int timeComparison = 
o1.getTimestamp().compareTo(o2.getTimestamp());
-                if (timeComparison != 0) {
-                    return timeComparison;
-                }
-
-                return o1.getNodeAddress().compareTo(o2.getNodeAddress());
-            }
-        });
-
-        nodeBulletinBoard.setBulletins(bulletinDtos);
-    }
-
-    /**
-     * Creates BulletinDTOs for the specified Bulletins.
-     *
-     * @param bulletins bulletin
-     * @return dto
-     */
-    public List<BulletinDTO> createBulletinDtos(final List<Bulletin> 
bulletins) {
-        final List<BulletinDTO> bulletinDtos = new 
ArrayList<>(bulletins.size());
-        for (final Bulletin bulletin : bulletins) {
-            bulletinDtos.add(createBulletinDto(bulletin));
-        }
-        return bulletinDtos;
-    }
-
-    /**
-     * Creates a BulletinDTO for the specified Bulletin.
-     *
-     * @param bulletin bulletin
-     * @return dto
-     */
-    public BulletinDTO createBulletinDto(final Bulletin bulletin) {
-        final BulletinDTO dto = new BulletinDTO();
-        dto.setId(bulletin.getId());
-        dto.setNodeAddress(bulletin.getNodeAddress());
-        dto.setTimestamp(bulletin.getTimestamp());
-        dto.setGroupId(bulletin.getGroupId());
-        dto.setSourceId(bulletin.getSourceId());
-        dto.setSourceName(bulletin.getSourceName());
-        dto.setCategory(bulletin.getCategory());
-        dto.setLevel(bulletin.getLevel());
-        dto.setMessage(bulletin.getMessage());
-        return dto;
-    }
-
-    private void mergeProvenanceQueryResults(final ProvenanceDTO 
provenanceDto, final Map<NodeIdentifier, ProvenanceDTO> resultMap, final 
Set<NodeResponse> problematicResponses) {
-        final ProvenanceResultsDTO results = provenanceDto.getResults();
-        final ProvenanceRequestDTO request = provenanceDto.getRequest();
-        final List<ProvenanceEventDTO> allResults = new ArrayList<>(1024);
-
-        final Set<String> errors = new HashSet<>();
-        Date oldestEventDate = new Date();
-        int percentageComplete = 0;
-        boolean finished = true;
-
-        long totalRecords = 0;
-        for (final Map.Entry<NodeIdentifier, ProvenanceDTO> entry : 
resultMap.entrySet()) {
-            final NodeIdentifier nodeIdentifier = entry.getKey();
-            final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + 
nodeIdentifier.getApiPort();
-
-            final ProvenanceDTO nodeDto = entry.getValue();
-            final ProvenanceResultsDTO nodeResultDto = nodeDto.getResults();
-            if (nodeResultDto != null && nodeResultDto.getProvenanceEvents() 
!= null) {
-                // increment the total number of records
-                totalRecords += nodeResultDto.getTotalCount();
-
-                // populate the cluster identifier
-                for (final ProvenanceEventDTO eventDto : 
nodeResultDto.getProvenanceEvents()) {
-                    eventDto.setClusterNodeId(nodeIdentifier.getId());
-                    eventDto.setClusterNodeAddress(nodeAddress);
-                    // add node identifier to the event's id so that it is 
unique across cluster
-                    eventDto.setId(nodeIdentifier.getId() + eventDto.getId());
-                    allResults.add(eventDto);
-                }
-            }
-
-            if (nodeResultDto.getOldestEvent() != null && 
nodeResultDto.getOldestEvent().before(oldestEventDate)) {
-                oldestEventDate = nodeResultDto.getOldestEvent();
-            }
-
-            if (nodeResultDto.getErrors() != null) {
-                for (final String error : nodeResultDto.getErrors()) {
-                    errors.add(nodeAddress + " -- " + error);
-                }
-            }
-
-            percentageComplete += nodeDto.getPercentCompleted();
-            if (!nodeDto.isFinished()) {
-                finished = false;
-            }
-        }
-        percentageComplete /= resultMap.size();
-
-        // consider any problematic responses as errors
-        for (final NodeResponse problematicResponse : problematicResponses) {
-            final NodeIdentifier problemNode = problematicResponse.getNodeId();
-            final String problemNodeAddress = problemNode.getApiAddress() + 
":" + problemNode.getApiPort();
-            errors.add(String.format("%s -- Request did not complete 
successfully (Status code: %s)", problemNodeAddress, 
problematicResponse.getStatus()));
-        }
-
-        // Since we get back up to the maximum number of results from each 
node, we need to sort those values and then
-        // grab only the first X number of them. We do a sort based on time, 
such that the newest are included.
-        // If 2 events have the same timestamp, we do a secondary sort based 
on Cluster Node Identifier. If those are
-        // equal, we perform a terciary sort based on the the event id
-        Collections.sort(allResults, new Comparator<ProvenanceEventDTO>() {
-            @Override
-            public int compare(final ProvenanceEventDTO o1, final 
ProvenanceEventDTO o2) {
-                final int eventTimeComparison = 
o1.getEventTime().compareTo(o2.getEventTime());
-                if (eventTimeComparison != 0) {
-                    return -eventTimeComparison;
-                }
-
-                final String nodeId1 = o1.getClusterNodeId();
-                final String nodeId2 = o2.getClusterNodeId();
-                final int nodeIdComparison;
-                if (nodeId1 == null && nodeId2 == null) {
-                    nodeIdComparison = 0;
-                } else if (nodeId1 == null) {
-                    nodeIdComparison = 1;
-                } else if (nodeId2 == null) {
-                    nodeIdComparison = -1;
-                } else {
-                    nodeIdComparison = -nodeId1.compareTo(nodeId2);
-                }
-
-                if (nodeIdComparison != 0) {
-                    return nodeIdComparison;
-                }
-
-                return -Long.compare(o1.getEventId(), o2.getEventId());
-            }
-        });
-
-        final int maxResults = request.getMaxResults().intValue();
-        final List<ProvenanceEventDTO> selectedResults;
-        if (allResults.size() < maxResults) {
-            selectedResults = allResults;
-        } else {
-            selectedResults = allResults.subList(0, maxResults);
-        }
-
-        // include any errors
-        if (errors.size() > 0) {
-            results.setErrors(errors);
-        }
-
-        results.setTotalCount(totalRecords);
-        results.setTotal(FormatUtils.formatCount(totalRecords));
-        results.setProvenanceEvents(selectedResults);
-        results.setOldestEvent(oldestEventDate);
-        results.setGenerated(new Date());
-        provenanceDto.setPercentCompleted(percentageComplete);
-        provenanceDto.setFinished(finished);
-    }
-
-    private void mergeRemoteProcessGroup(final RemoteProcessGroupDTO 
remoteProcessGroup, final Map<NodeIdentifier, RemoteProcessGroupDTO> 
remoteProcessGroupMap) {
-        final RemoteProcessGroupContentsDTO remoteProcessGroupContents = 
remoteProcessGroup.getContents();
-
-        Boolean mergedIsTargetSecure = null;
-        final List<String> mergedAuthorizationIssues = new ArrayList<>();
-        final Set<RemoteProcessGroupPortDTO> mergedInputPorts = new 
HashSet<>();
-        final Set<RemoteProcessGroupPortDTO> mergedOutputPorts = new 
HashSet<>();
-
-        for (final Map.Entry<NodeIdentifier, RemoteProcessGroupDTO> nodeEntry 
: remoteProcessGroupMap.entrySet()) {
-            final NodeIdentifier nodeId = nodeEntry.getKey();
-            final RemoteProcessGroupDTO nodeRemoteProcessGroupDto = 
nodeEntry.getValue();
-
-            // merge the  issues
-            final List<String> nodeAuthorizationIssues = 
nodeRemoteProcessGroupDto.getAuthorizationIssues();
-            if (nodeAuthorizationIssues != null && 
!nodeAuthorizationIssues.isEmpty()) {
-                for (final String nodeAuthorizationIssue : 
nodeAuthorizationIssues) {
-                    mergedAuthorizationIssues.add(nodeId.getApiAddress() + ":" 
+ nodeId.getApiPort() + " -- " + nodeAuthorizationIssue);
-                }
-            }
-
-            // use the first target secure flag since they will all be the same
-            final Boolean nodeIsTargetSecure = 
nodeRemoteProcessGroupDto.isTargetSecure();
-            if (mergedIsTargetSecure == null) {
-                mergedIsTargetSecure = nodeIsTargetSecure;
-            }
-
-            // merge the ports in the contents
-            final RemoteProcessGroupContentsDTO 
nodeRemoteProcessGroupContentsDto = nodeRemoteProcessGroupDto.getContents();
-            if (remoteProcessGroupContents != null && 
nodeRemoteProcessGroupContentsDto != null) {
-                if (nodeRemoteProcessGroupContentsDto.getInputPorts() != null) 
{
-                    
mergedInputPorts.addAll(nodeRemoteProcessGroupContentsDto.getInputPorts());
-                }
-                if (nodeRemoteProcessGroupContentsDto.getOutputPorts() != 
null) {
-                    
mergedOutputPorts.addAll(nodeRemoteProcessGroupContentsDto.getOutputPorts());
-                }
-            }
-        }
-
-        if (remoteProcessGroupContents != null) {
-            if (!mergedInputPorts.isEmpty()) {
-                remoteProcessGroupContents.setInputPorts(mergedInputPorts);
-            }
-            if (!mergedOutputPorts.isEmpty()) {
-                remoteProcessGroupContents.setOutputPorts(mergedOutputPorts);
-            }
-        }
-
-        if (mergedIsTargetSecure != null) {
-            remoteProcessGroup.setTargetSecure(mergedIsTargetSecure);
-        }
-
-        if (!mergedAuthorizationIssues.isEmpty()) {
-            
remoteProcessGroup.setAuthorizationIssues(mergedAuthorizationIssues);
-        }
-    }
-
-    private void mergeControllerServiceReferences(
-            final Set<ControllerServiceReferencingComponentDTO> 
referencingComponents, final Map<NodeIdentifier, 
Set<ControllerServiceReferencingComponentDTO>> referencingComponentMap) {
-        final Map<String, Integer> activeThreadCounts = new HashMap<>();
-        final Map<String, String> states = new HashMap<>();
-        for (final Map.Entry<NodeIdentifier, 
Set<ControllerServiceReferencingComponentDTO>> nodeEntry : 
referencingComponentMap.entrySet()) {
-            final Set<ControllerServiceReferencingComponentDTO> 
nodeReferencingComponents = nodeEntry.getValue();
-
-            // go through all the nodes referencing components
-            if (nodeReferencingComponents != null) {
-                for (final ControllerServiceReferencingComponentDTO 
nodeReferencingComponent : nodeReferencingComponents) {
-                    // handle active thread counts
-                    if (nodeReferencingComponent.getActiveThreadCount() != 
null && nodeReferencingComponent.getActiveThreadCount() > 0) {
-                        final Integer current = 
activeThreadCounts.get(nodeReferencingComponent.getId());
-                        if (current == null) {
-                            
activeThreadCounts.put(nodeReferencingComponent.getId(), 
nodeReferencingComponent.getActiveThreadCount());
-                        } else {
-                            
activeThreadCounts.put(nodeReferencingComponent.getId(), 
nodeReferencingComponent.getActiveThreadCount() + current);
-                        }
-                    }
-
-                    // handle controller service state
-                    final String state = 
states.get(nodeReferencingComponent.getId());
-                    if (state == null) {
-                        if 
(ControllerServiceState.DISABLING.name().equals(nodeReferencingComponent.getState()))
 {
-                            states.put(nodeReferencingComponent.getId(), 
ControllerServiceState.DISABLING.name());
-                        } else if 
(ControllerServiceState.ENABLING.name().equals(nodeReferencingComponent.getState()))
 {
-                            states.put(nodeReferencingComponent.getId(), 
ControllerServiceState.ENABLING.name());
-                        }
-                    }
-                }
-            }
-        }
-
-        // go through each referencing components
-        for (final ControllerServiceReferencingComponentDTO 
referencingComponent : referencingComponents) {
-            final Integer activeThreadCount = 
activeThreadCounts.get(referencingComponent.getId());
-            if (activeThreadCount != null) {
-                referencingComponent.setActiveThreadCount(activeThreadCount);
-            }
-
-            final String state = states.get(referencingComponent.getId());
-            if (state != null) {
-                referencingComponent.setState(state);
-            }
-        }
-    }
-
-    private void mergeControllerService(final ControllerServiceDTO 
controllerService, final Map<NodeIdentifier, ControllerServiceDTO> 
controllerServiceMap) {
-        final Map<String, Set<NodeIdentifier>> validationErrorMap = new 
HashMap<>();
-        final Set<ControllerServiceReferencingComponentDTO> 
referencingComponents = controllerService.getReferencingComponents();
-        final Map<NodeIdentifier, 
Set<ControllerServiceReferencingComponentDTO>> nodeReferencingComponentsMap = 
new HashMap<>();
-
-        String state = null;
-        for (final Map.Entry<NodeIdentifier, ControllerServiceDTO> nodeEntry : 
controllerServiceMap.entrySet()) {
-            final NodeIdentifier nodeId = nodeEntry.getKey();
-            final ControllerServiceDTO nodeControllerService = 
nodeEntry.getValue();
-
-            if (state == null) {
-                if 
(ControllerServiceState.DISABLING.name().equals(nodeControllerService.getState()))
 {
-                    state = ControllerServiceState.DISABLING.name();
-                } else if 
(ControllerServiceState.ENABLING.name().equals(nodeControllerService.getState()))
 {
-                    state = ControllerServiceState.ENABLING.name();
-                }
-            }
-
-            for (final ControllerServiceReferencingComponentDTO 
nodeReferencingComponents : nodeControllerService.getReferencingComponents()) {
-                nodeReferencingComponentsMap.put(nodeId, 
nodeReferencingComponents.getReferencingComponents());
-            }
-
-            // merge the validation errors
-            mergeValidationErrors(validationErrorMap, nodeId, 
nodeControllerService.getValidationErrors());
-        }
-
-        // merge the referencing components
-        mergeControllerServiceReferences(referencingComponents, 
nodeReferencingComponentsMap);
-
-        // store the 'transition' state is applicable
-        if (state != null) {
-            controllerService.setState(state);
-        }
-
-        // set the merged the validation errors
-        
controllerService.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap,
 controllerServiceMap.size()));
-    }
-
-    private void mergeReportingTask(final ReportingTaskDTO reportingTask, 
final Map<NodeIdentifier, ReportingTaskDTO> reportingTaskMap) {
-        final Map<String, Set<NodeIdentifier>> validationErrorMap = new 
HashMap<>();
-
-        int activeThreadCount = 0;
-        for (final Map.Entry<NodeIdentifier, ReportingTaskDTO> nodeEntry : 
reportingTaskMap.entrySet()) {
-            final NodeIdentifier nodeId = nodeEntry.getKey();
-            final ReportingTaskDTO nodeReportingTask = nodeEntry.getValue();
-
-            if (nodeReportingTask.getActiveThreadCount() != null) {
-                activeThreadCount += nodeReportingTask.getActiveThreadCount();
-            }
-
-            // merge the validation errors
-            mergeValidationErrors(validationErrorMap, nodeId, 
nodeReportingTask.getValidationErrors());
-        }
-
-        // set the merged active thread counts
-        reportingTask.setActiveThreadCount(activeThreadCount);
-
-        // set the merged the validation errors
-        
reportingTask.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap,
 reportingTaskMap.size()));
-    }
-
-    /**
-     * Merges the validation errors into the specified map, recording the 
corresponding node identifier.
-     *
-     * @param validationErrorMap map
-     * @param nodeId id
-     * @param nodeValidationErrors errors
-     */
-    public void mergeValidationErrors(final Map<String, Set<NodeIdentifier>> 
validationErrorMap, final NodeIdentifier nodeId, final Collection<String> 
nodeValidationErrors) {
-        if (nodeValidationErrors != null) {
-            for (final String nodeValidationError : nodeValidationErrors) {
-                Set<NodeIdentifier> nodeSet = 
validationErrorMap.get(nodeValidationError);
-                if (nodeSet == null) {
-                    nodeSet = new HashSet<>();
-                    validationErrorMap.put(nodeValidationError, nodeSet);
-                }
-                nodeSet.add(nodeId);
-            }
-        }
-    }
-
-    /**
-     * Normalizes the validation errors by prepending the corresponding nodes 
when the error does not exist across all nodes.
-     *
-     * @param validationErrorMap map
-     * @param totalNodes total
-     * @return normalized errors
-     */
-    public Set<String> normalizedMergedValidationErrors(final Map<String, 
Set<NodeIdentifier>> validationErrorMap, int totalNodes) {
-        final Set<String> normalizedValidationErrors = new HashSet<>();
-        for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : 
validationErrorMap.entrySet()) {
-            final String msg = validationEntry.getKey();
-            final Set<NodeIdentifier> nodeIds = validationEntry.getValue();
-
-            if (nodeIds.size() == totalNodes) {
-                normalizedValidationErrors.add(msg);
-            } else {
-                for (final NodeIdentifier nodeId : nodeIds) {
-                    normalizedValidationErrors.add(nodeId.getApiAddress() + 
":" + nodeId.getApiPort() + " -- " + msg);
-                }
-            }
-        }
-        return normalizedValidationErrors;
-    }
-
-
-    /**
-     * Merges the listing requests in the specified map into the specified 
listing request
-     *
-     * @param listingRequest the target listing request
-     * @param listingRequestMap the mapping of all responses being merged
-     */
-    private void mergeListingRequests(final ListingRequestDTO listingRequest, 
final Map<NodeIdentifier, ListingRequestDTO> listingRequestMap) {
-        final Comparator<FlowFileSummaryDTO> comparator = new 
Comparator<FlowFileSummaryDTO>() {
-            @Override
-            public int compare(final FlowFileSummaryDTO dto1, final 
FlowFileSummaryDTO dto2) {
-                int positionCompare = 
dto1.getPosition().compareTo(dto2.getPosition());
-                if (positionCompare != 0) {
-                    return positionCompare;
-                }
-
-                final String address1 = dto1.getClusterNodeAddress();
-                final String address2 = dto2.getClusterNodeAddress();
-                if (address1 == null && address2 == null) {
-                    return 0;
-                }
-                if (address1 == null) {
-                    return 1;
-                }
-                if (address2 == null) {
-                    return -1;
-                }
-                return address1.compareTo(address2);
-            }
-        };
-
-        final NavigableSet<FlowFileSummaryDTO> flowFileSummaries = new 
TreeSet<>(comparator);
-
-        ListFlowFileState state = null;
-        int numStepsCompleted = 0;
-        int numStepsTotal = 0;
-        int objectCount = 0;
-        long byteCount = 0;
-        boolean finished = true;
-        for (final Map.Entry<NodeIdentifier, ListingRequestDTO> entry : 
listingRequestMap.entrySet()) {
-            final NodeIdentifier nodeIdentifier = entry.getKey();
-            final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + 
nodeIdentifier.getApiPort();
-
-            final ListingRequestDTO nodeRequest = entry.getValue();
-
-            numStepsTotal++;
-            if (Boolean.TRUE.equals(nodeRequest.getFinished())) {
-                numStepsCompleted++;
-            }
-
-            final QueueSizeDTO nodeQueueSize = nodeRequest.getQueueSize();
-         

<TRUNCATED>

Reply via email to