http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index c0f4c63..3d21b55 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -30,7 +30,6 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
@@ -73,7 +72,6 @@ import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
-import org.apache.nifi.cluster.BulletinsPayload;
 import org.apache.nifi.cluster.HeartbeatPayload;
 import org.apache.nifi.cluster.context.ClusterContext;
 import org.apache.nifi.cluster.context.ClusterContextImpl;
@@ -109,7 +107,6 @@ import org.apache.nifi.cluster.node.Node.Status;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
 import org.apache.nifi.cluster.protocol.Heartbeat;
-import org.apache.nifi.cluster.protocol.NodeBulletins;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolHandler;
@@ -121,7 +118,6 @@ import 
org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import 
org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
 import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
@@ -153,11 +149,13 @@ import 
org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.controller.state.SortedStateUtils;
 import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
 import org.apache.nifi.controller.status.history.MetricDescriptor;
-import org.apache.nifi.controller.status.history.StatusHistory;
+import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
+import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor;
+import 
org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor;
+import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
 import org.apache.nifi.controller.status.history.StatusHistoryUtil;
 import org.apache.nifi.controller.status.history.StatusSnapshot;
 import org.apache.nifi.diagnostics.GarbageCollection;
@@ -188,7 +186,9 @@ import 
org.apache.nifi.remote.cluster.ClusterNodeInformation;
 import org.apache.nifi.remote.cluster.NodeInformation;
 import org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol;
 import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinQuery;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.reporting.ReportingInitializationContext;
 import org.apache.nifi.reporting.ReportingTask;
@@ -202,6 +202,7 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.web.OptimisticLockingManager;
 import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.UpdateRevision;
+import org.apache.nifi.web.api.dto.BulletinDTO;
 import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
@@ -209,7 +210,6 @@ import org.apache.nifi.web.api.dto.DropRequestDTO;
 import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.ListingRequestDTO;
-import org.apache.nifi.web.api.dto.NodeDTO;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.QueueSizeDTO;
@@ -223,18 +223,23 @@ import 
org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO;
-import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
-import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO;
+import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.StatusDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+import org.apache.nifi.web.api.dto.status.StatusMerger;
 import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
 import org.apache.nifi.web.api.entity.ComponentStateEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import 
org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
 import org.apache.nifi.web.api.entity.ControllerServicesEntity;
+import org.apache.nifi.web.api.entity.ControllerStatusEntity;
 import org.apache.nifi.web.api.entity.DropRequestEntity;
 import org.apache.nifi.web.api.entity.FlowSnippetEntity;
 import org.apache.nifi.web.api.entity.ListingRequestEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.apache.nifi.web.api.entity.ProcessorsEntity;
 import org.apache.nifi.web.api.entity.ProvenanceEntity;
@@ -243,6 +248,7 @@ import 
org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
 import org.apache.nifi.web.api.entity.ReportingTaskEntity;
 import org.apache.nifi.web.api.entity.ReportingTasksEntity;
+import org.apache.nifi.web.api.entity.StatusHistoryEntity;
 import org.apache.nifi.web.util.WebUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -321,6 +327,8 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}");
 
     public static final Pattern PROCESS_GROUP_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))");
+    public static final Pattern GROUP_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status");
+    public static final Pattern CONTROLLER_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/status");
     public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance");
     public static final Pattern FLOW_SNIPPET_INSTANCE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/snippet-instance");
 
@@ -337,6 +345,15 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     public static final Pattern REPORTING_TASK_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
     public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}/state");
 
+    public static final Pattern PROCESSOR_STATUS_HISTORY_URI_PATTERN =
+        
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/status/history");
+    public static final Pattern PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status/history");
+    public static final Pattern 
REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern
+        
.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}/status/history");
+    public static final Pattern CONNECTION_STATUS_HISTORY_URI_PATTERN = Pattern
+        
.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/status/history");
+
+
     @Deprecated
     public static final Pattern QUEUE_CONTENTS_URI = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents");
     public static final Pattern DROP_REQUESTS_URI = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests");
@@ -378,7 +395,6 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     private final BulletinRepository bulletinRepository;
     private final String instanceId;
     private final FlowEngine reportingTaskEngine;
-    private final Map<NodeIdentifier, ComponentStatusRepository> 
componentMetricsRepositoryMap = new HashMap<>();
     private final StandardProcessScheduler processScheduler;
     private final StateManagerProvider stateManagerProvider;
     private final long componentStatusSnapshotMillis;
@@ -463,7 +479,6 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, 
new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor));
         processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 
10);
         processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
-        processScheduler.scheduleFrameworkTask(new CaptureComponentMetrics(), 
"Capture Component Metrics", componentStatusSnapshotMillis, 
componentStatusSnapshotMillis, TimeUnit.MILLISECONDS);
 
         controllerServiceProvider = new 
StandardControllerServiceProvider(processScheduler, bulletinRepository, 
stateManagerProvider);
     }
@@ -620,7 +635,6 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         return MessageType.CONNECTION_REQUEST == msg.getType()
                 || MessageType.HEARTBEAT == msg.getType()
                 || MessageType.CONTROLLER_STARTUP_FAILURE == msg.getType()
-                || MessageType.BULLETINS == msg.getType()
                 || MessageType.RECONNECTION_FAILURE == msg.getType();
     }
 
@@ -654,10 +668,6 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                     }
                 }, "Handle Reconnection Failure Message from " + 
((ReconnectionFailureMessage) protocolMessage).getNodeId()).start();
                 return null;
-            case BULLETINS:
-                final NodeBulletinsMessage bulletinsMessage = 
(NodeBulletinsMessage) protocolMessage;
-                handleBulletins(bulletinsMessage.getBulletins());
-                return null;
             default:
                 throw new ProtocolException("No handler defined for message 
type: " + protocolMessage.getType());
         }
@@ -1686,22 +1696,6 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         processScheduler.enableReportingTask(reportingTask);
     }
 
-    /**
-     * Handle a bulletins message.
-     *
-     * @param bulletins bulletins
-     */
-    public void handleBulletins(final NodeBulletins bulletins) {
-        final NodeIdentifier nodeIdentifier = bulletins.getNodeIdentifier();
-        final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + 
nodeIdentifier.getApiPort();
-
-        // unmarshal the message
-        final BulletinsPayload payload = 
BulletinsPayload.unmarshal(bulletins.getPayload());
-        for (final Bulletin bulletin : payload.getBulletins()) {
-            bulletin.setNodeAddress(nodeAddress);
-            bulletinRepository.addBulletin(bulletin);
-        }
-    }
 
     /**
      * Handles a node's heartbeat. If this heartbeat is a node's first 
heartbeat since its connection request, then the manager will mark the node as 
connected. If the node was previously disconnected
@@ -2442,6 +2436,14 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         return ("GET".equalsIgnoreCase(method) || 
"PUT".equalsIgnoreCase(method)) && 
PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches();
     }
 
+    private static boolean isGroupStatusEndpoint(final URI uri, final String 
method) {
+        return "GET".equalsIgnoreCase(method) && 
GROUP_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    private static boolean isControllerStatusEndpoint(final URI uri, final 
String method) {
+        return "GET".equalsIgnoreCase(method) && 
CONTROLLER_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
     private static boolean isTemplateEndpoint(final URI uri, final String 
method) {
         return "POST".equalsIgnoreCase(method) && 
TEMPLATE_INSTANCE_URI_PATTERN.matcher(uri.getPath()).matches();
     }
@@ -2454,6 +2456,23 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         return "GET".equalsIgnoreCase(method) && 
REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches();
     }
 
+    private static boolean isProcessorStatusHistoryEndpoint(final URI uri, 
final String method) {
+        return "GET".equalsIgnoreCase(method) && 
PROCESSOR_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    private static boolean isProcessGroupStatusHistoryEndpoint(final URI uri, 
final String method) {
+        return "GET".equalsIgnoreCase(method) && 
PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    private static boolean isRemoteProcessGroupStatusHistoryEndpoint(final URI 
uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && 
REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    private static boolean isConnectionStatusHistoryEndpoint(final URI uri, 
final String method) {
+        return "GET".equalsIgnoreCase(method) && 
CONNECTION_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+
     private static boolean isRemoteProcessGroupEndpoint(final URI uri, final 
String method) {
         if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) 
&& REMOTE_PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches()) {
             return true;
@@ -2556,7 +2575,10 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                 || isControllerServicesEndpoint(uri, method) || 
isControllerServiceEndpoint(uri, method)
                 || isControllerServiceReferenceEndpoint(uri, method) || 
isControllerServiceStateEndpoint(uri, method)
                 || isReportingTasksEndpoint(uri, method) || 
isReportingTaskEndpoint(uri, method) || isReportingTaskStateEndpoint(uri, 
method)
-                || isDropRequestEndpoint(uri, method) || 
isListFlowFilesEndpoint(uri, method);
+                || isDropRequestEndpoint(uri, method) || 
isListFlowFilesEndpoint(uri, method)
+            || isGroupStatusEndpoint(uri, method) || 
isControllerStatusEndpoint(uri, method)
+            || isProcessorStatusHistoryEndpoint(uri, method) || 
isProcessGroupStatusHistoryEndpoint(uri, method)
+            || isRemoteProcessGroupStatusHistoryEndpoint(uri, method) || 
isConnectionStatusHistoryEndpoint(uri, method);
     }
 
     private void mergeProcessorValidationErrors(final ProcessorDTO processor, 
Map<NodeIdentifier, ProcessorDTO> processorMap) {
@@ -2608,6 +2630,159 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         componentState.getLocalState().setState(localStateEntries);
     }
 
+    @SuppressWarnings("unchecked")
+    private void updateBulletins(final String nodeAddress, final Collection<? 
extends StatusDTO>... dtos) {
+        for (final Collection<? extends StatusDTO> collection : dtos) {
+            if (collection != null) {
+                for (final StatusDTO dto : collection) {
+                    final List<BulletinDTO> bulletins = dto.getBulletins();
+                    if (bulletins != null) {
+                        for (final BulletinDTO bulletin : bulletins) {
+                            bulletin.setNodeAddress(nodeAddress);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void updateBulletins(final ProcessGroupStatusDTO dto, final String 
nodeAddress) {
+        for (final BulletinDTO bulletin : dto.getBulletins()) {
+            bulletin.setNodeAddress(nodeAddress);
+        }
+
+        updateBulletins(nodeAddress, dto.getProcessorStatus(), 
dto.getInputPortStatus(), dto.getOutputPortStatus(), 
dto.getRemoteProcessGroupStatus());
+
+        if (dto.getProcessGroupStatus() != null) {
+            for (final ProcessGroupStatusDTO childGroup : 
dto.getProcessGroupStatus()) {
+                updateBulletins(childGroup, nodeAddress);
+            }
+        }
+    }
+
+    private void mergeGroupStatus(final ProcessGroupStatusDTO statusDto, final 
Map<NodeIdentifier, ProcessGroupStatusDTO> resultMap) {
+        ProcessGroupStatusDTO mergedProcessGroupStatus = statusDto;
+        for (final Map.Entry<NodeIdentifier, ProcessGroupStatusDTO> entry : 
resultMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final ProcessGroupStatusDTO nodeProcessGroupStatus = 
entry.getValue();
+
+            final String nodeAddress = nodeId.getApiAddress() + ":" + 
nodeId.getApiPort();
+            updateBulletins(mergedProcessGroupStatus, nodeAddress);
+
+            if (nodeProcessGroupStatus == mergedProcessGroupStatus) {
+                continue;
+            }
+
+            final ProcessGroupStatusDTO nodeClone = 
nodeProcessGroupStatus.clone();
+            for (final RemoteProcessGroupStatusDTO remoteProcessGroupStatus : 
nodeClone.getRemoteProcessGroupStatus()) {
+                final List<String> nodeAuthorizationIssues = 
remoteProcessGroupStatus.getAuthorizationIssues();
+                if (!nodeAuthorizationIssues.isEmpty()) {
+                    for (final ListIterator<String> iter = 
nodeAuthorizationIssues.listIterator(); iter.hasNext();) {
+                        final String Issue = iter.next();
+                        iter.set("[" + nodeId.getApiAddress() + ":" + 
nodeId.getApiPort() + "] -- " + Issue);
+                    }
+                    
remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues);
+                }
+            }
+
+            StatusMerger.merge(mergedProcessGroupStatus, nodeClone);
+        }
+
+        StatusMerger.updatePrettyPrintedFields(mergedProcessGroupStatus);
+    }
+
+
+    private void mergeControllerStatus(final ControllerStatusDTO statusDto, 
final Map<NodeIdentifier, ControllerStatusDTO> resultMap) {
+        ControllerStatusDTO mergedStatus = statusDto;
+        for (final Map.Entry<NodeIdentifier, ControllerStatusDTO> entry : 
resultMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final ControllerStatusDTO nodeStatus = entry.getValue();
+
+            final String nodeAddress = nodeId.getApiAddress() + ":" + 
nodeId.getApiPort();
+            for (final BulletinDTO bulletin : nodeStatus.getBulletins()) {
+                bulletin.setNodeAddress(nodeAddress);
+            }
+            for (final BulletinDTO bulletin : 
nodeStatus.getControllerServiceBulletins()) {
+                bulletin.setNodeAddress(nodeAddress);
+            }
+            for (final BulletinDTO bulletin : 
nodeStatus.getReportingTaskBulletins()) {
+                bulletin.setNodeAddress(nodeAddress);
+            }
+
+            if (nodeStatus == mergedStatus) {
+                continue;
+            }
+
+            StatusMerger.merge(mergedStatus, nodeStatus);
+        }
+
+        final int totalNodeCount = getNodeIds().size();
+        final int connectedNodeCount = getNodeIds(Status.CONNECTED).size();
+
+        final List<Bulletin> ncmControllerBulletins = 
getBulletinRepository().findBulletinsForController();
+        
mergedStatus.setBulletins(mergeNCMBulletins(mergedStatus.getBulletins(), 
ncmControllerBulletins));
+
+        // get the controller service bulletins
+        final BulletinQuery controllerServiceQuery = new 
BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build();
+        final List<Bulletin> ncmServiceBulletins = 
getBulletinRepository().findBulletins(controllerServiceQuery);
+        
mergedStatus.setControllerServiceBulletins(mergeNCMBulletins(mergedStatus.getControllerServiceBulletins(),
 ncmServiceBulletins));
+
+        // get the reporting task bulletins
+        final BulletinQuery reportingTaskQuery = new 
BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build();
+        final List<Bulletin> ncmReportingTaskBulletins = 
getBulletinRepository().findBulletins(reportingTaskQuery);
+        
mergedStatus.setReportingTaskBulletins(mergeNCMBulletins(mergedStatus.getReportingTaskBulletins(),
 ncmReportingTaskBulletins));
+
+        mergedStatus.setConnectedNodeCount(connectedNodeCount);
+        mergedStatus.setTotalNodeCount(totalNodeCount);
+        StatusMerger.updatePrettyPrintedFields(mergedStatus);
+    }
+
+    private List<BulletinDTO> mergeNCMBulletins(final List<BulletinDTO> 
nodeBulletins, final List<Bulletin> ncmBulletins) {
+        if (ncmBulletins == null || ncmBulletins.isEmpty()) {
+            return nodeBulletins;
+        }
+
+        final List<BulletinDTO> mergedBulletins = new 
ArrayList<>(nodeBulletins.size() + ncmBulletins.size());
+        mergedBulletins.addAll(nodeBulletins);
+        mergedBulletins.addAll(createBulletinDtos(ncmBulletins));
+        return mergedBulletins;
+    }
+
+    /**
+     * Creates BulletinDTOs for the specified Bulletins.
+     *
+     * @param bulletins bulletin
+     * @return dto
+     */
+    public List<BulletinDTO> createBulletinDtos(final List<Bulletin> 
bulletins) {
+        final List<BulletinDTO> bulletinDtos = new 
ArrayList<>(bulletins.size());
+        for (final Bulletin bulletin : bulletins) {
+            bulletinDtos.add(createBulletinDto(bulletin));
+        }
+        return bulletinDtos;
+    }
+
+    /**
+     * Creates a BulletinDTO for the specified Bulletin.
+     *
+     * @param bulletin bulletin
+     * @return dto
+     */
+    public BulletinDTO createBulletinDto(final Bulletin bulletin) {
+        final BulletinDTO dto = new BulletinDTO();
+        dto.setId(bulletin.getId());
+        dto.setNodeAddress(bulletin.getNodeAddress());
+        dto.setTimestamp(bulletin.getTimestamp());
+        dto.setGroupId(bulletin.getGroupId());
+        dto.setSourceId(bulletin.getSourceId());
+        dto.setSourceName(bulletin.getSourceName());
+        dto.setCategory(bulletin.getCategory());
+        dto.setLevel(bulletin.getLevel());
+        dto.setMessage(bulletin.getMessage());
+        return dto;
+    }
+
     private void mergeProvenanceQueryResults(final ProvenanceDTO 
provenanceDto, final Map<NodeIdentifier, ProvenanceDTO> resultMap, final 
Set<NodeResponse> problematicResponses) {
         final ProvenanceResultsDTO results = provenanceDto.getResults();
         final ProvenanceRequestDTO request = provenanceDto.getRequest();
@@ -3545,6 +3720,70 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             mergeComponentState(componentState, resultsMap);
 
             clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isGroupStatusEndpoint(uri, 
method)) {
+            final ProcessGroupStatusEntity responseEntity = 
clientResponse.getClientResponse().getEntity(ProcessGroupStatusEntity.class);
+            final ProcessGroupStatusDTO statusRequest = 
responseEntity.getProcessGroupStatus();
+
+            final Map<NodeIdentifier, ProcessGroupStatusDTO> resultsMap = new 
HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ProcessGroupStatusEntity nodeResponseEntity = 
nodeResponse == clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ProcessGroupStatusEntity.class);
+                final ProcessGroupStatusDTO nodeStatus = 
nodeResponseEntity.getProcessGroupStatus();
+
+                resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+            }
+            mergeGroupStatus(statusRequest, resultsMap);
+
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && 
isControllerStatusEndpoint(uri, method)) {
+            final ControllerStatusEntity responseEntity = 
clientResponse.getClientResponse().getEntity(ControllerStatusEntity.class);
+            final ControllerStatusDTO statusRequest = 
responseEntity.getControllerStatus();
+
+            final Map<NodeIdentifier, ControllerStatusDTO> resultsMap = new 
HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ControllerStatusEntity nodeResponseEntity = nodeResponse 
== clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ControllerStatusEntity.class);
+                final ControllerStatusDTO nodeStatus = 
nodeResponseEntity.getControllerStatus();
+
+                resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+            }
+            mergeControllerStatus(statusRequest, resultsMap);
+
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && 
isProcessorStatusHistoryEndpoint(uri, method)) {
+            final Map<String, MetricDescriptor<?>> metricDescriptors = new 
HashMap<>();
+            for (final ProcessorStatusDescriptor descriptor : 
ProcessorStatusDescriptor.values()) {
+                metricDescriptors.put(descriptor.getField(), 
descriptor.getDescriptor());
+            }
+
+            clientResponse = mergeStatusHistoryResponses(clientResponse, 
updatedNodesMap, problematicNodeResponses, metricDescriptors);
+        } else if (hasSuccessfulClientResponse && 
isConnectionStatusHistoryEndpoint(uri, method)) {
+            final Map<String, MetricDescriptor<?>> metricDescriptors = new 
HashMap<>();
+            for (final ConnectionStatusDescriptor descriptor : 
ConnectionStatusDescriptor.values()) {
+                metricDescriptors.put(descriptor.getField(), 
descriptor.getDescriptor());
+            }
+
+            clientResponse = mergeStatusHistoryResponses(clientResponse, 
updatedNodesMap, problematicNodeResponses, metricDescriptors);
+        } else if (hasSuccessfulClientResponse && 
isProcessGroupStatusHistoryEndpoint(uri, method)) {
+            final Map<String, MetricDescriptor<?>> metricDescriptors = new 
HashMap<>();
+            for (final ProcessGroupStatusDescriptor descriptor : 
ProcessGroupStatusDescriptor.values()) {
+                metricDescriptors.put(descriptor.getField(), 
descriptor.getDescriptor());
+            }
+
+            clientResponse = mergeStatusHistoryResponses(clientResponse, 
updatedNodesMap, problematicNodeResponses, metricDescriptors);
+        } else if (hasSuccessfulClientResponse && 
isRemoteProcessGroupStatusHistoryEndpoint(uri, method)) {
+            final Map<String, MetricDescriptor<?>> metricDescriptors = new 
HashMap<>();
+            for (final RemoteProcessGroupStatusDescriptor descriptor : 
RemoteProcessGroupStatusDescriptor.values()) {
+                metricDescriptors.put(descriptor.getField(), 
descriptor.getDescriptor());
+            }
+
+            clientResponse = mergeStatusHistoryResponses(clientResponse, 
updatedNodesMap, problematicNodeResponses, metricDescriptors);
         } else {
             if (!nodeResponsesToDrain.isEmpty()) {
                 drainResponses(nodeResponsesToDrain);
@@ -3603,6 +3842,44 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         return clientResponse;
     }
 
+
+    private NodeResponse mergeStatusHistoryResponses(NodeResponse 
clientResponse, Map<Node, NodeResponse> updatedNodesMap, Set<NodeResponse> 
problematicNodeResponses,
+        Map<String, MetricDescriptor<?>> metricDescriptors) {
+        final StatusHistoryEntity responseEntity = 
clientResponse.getClientResponse().getEntity(StatusHistoryEntity.class);
+
+        StatusHistoryDTO lastStatusHistory = null;
+        final Map<String, List<StatusSnapshotDTO>> nodeStatusHistories = new 
HashMap<>(updatedNodesMap.size());
+        for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+            if (problematicNodeResponses.contains(nodeResponse)) {
+                continue;
+            }
+
+            final StatusHistoryEntity nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(StatusHistoryEntity.class);
+            final StatusHistoryDTO nodeStatus = 
nodeResponseEntity.getStatusHistory();
+            lastStatusHistory = nodeStatus;
+
+            final NodeIdentifier nodeId = nodeResponse.getNodeId();
+            final String nodeName = nodeId.getApiAddress() + ":" + 
nodeId.getApiPort();
+            nodeStatusHistories.put(nodeName, 
nodeStatus.getAggregateStatusSnapshots());
+        }
+
+        final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
+        
clusterStatusHistory.setAggregateStatusSnapshots(mergeStatusHistories(nodeStatusHistories,
 metricDescriptors));
+        clusterStatusHistory.setGenerated(new Date());
+        clusterStatusHistory.setNodeStatusSnapshots(nodeStatusHistories);
+        if (lastStatusHistory != null) {
+            
clusterStatusHistory.setComponentDetails(lastStatusHistory.getComponentDetails());
+            
clusterStatusHistory.setFieldDescriptors(lastStatusHistory.getFieldDescriptors());
+        }
+
+        final StatusHistoryEntity clusterEntity = new StatusHistoryEntity();
+        clusterEntity.setStatusHistory(clusterStatusHistory);
+        clusterEntity.setRevision(responseEntity.getRevision());
+
+        return new NodeResponse(clientResponse, clusterEntity);
+    }
+
+
     /**
      * Determines if all problematic responses were due to 404 NOT_FOUND. 
Assumes that problematicNodeResponses is not empty and is not comprised of 
responses from all nodes in the cluster (at least
      * one node contained the counter in question).
@@ -4026,78 +4303,6 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         return bulletinRepository;
     }
 
-    @Override
-    public ProcessGroupStatus getProcessGroupStatus(final String groupId) {
-        final Set<Node> connectedNodes = getNodes(Node.Status.CONNECTED);
-
-        // ensure there are some nodes in the cluster
-        if (connectedNodes.isEmpty()) {
-            throw new NoConnectedNodesException();
-        }
-
-        ProcessGroupStatus mergedProcessGroupStatus = null;
-        for (final Node node : connectedNodes) {
-            final NodeIdentifier nodeId = node.getNodeId();
-            final HeartbeatPayload nodeHeartbeatPayload = 
node.getHeartbeatPayload();
-            if (nodeHeartbeatPayload == null) {
-                continue;
-            }
-            final ProcessGroupStatus nodeRootProcessGroupStatus = 
nodeHeartbeatPayload.getProcessGroupStatus();
-            final ProcessGroupStatus nodeProcessGroupStatus = 
groupId.equals(ROOT_GROUP_ID_ALIAS) ? nodeRootProcessGroupStatus : 
getProcessGroupStatus(nodeRootProcessGroupStatus, groupId);
-            if (nodeProcessGroupStatus == null) {
-                continue;
-            }
-
-            if (mergedProcessGroupStatus == null) {
-                mergedProcessGroupStatus = nodeProcessGroupStatus.clone();
-
-                // update any  issues with the node label
-                if (mergedProcessGroupStatus.getRemoteProcessGroupStatus() != 
null) {
-                    for (final RemoteProcessGroupStatus 
remoteProcessGroupStatus : 
mergedProcessGroupStatus.getRemoteProcessGroupStatus()) {
-                        final List<String> nodeAuthorizationIssues = 
remoteProcessGroupStatus.getAuthorizationIssues();
-                        if (!nodeAuthorizationIssues.isEmpty()) {
-                            for (final ListIterator<String> iter = 
nodeAuthorizationIssues.listIterator(); iter.hasNext();) {
-                                final String Issue = iter.next();
-                                iter.set("[" + nodeId.getApiAddress() + ":" + 
nodeId.getApiPort() + "] -- " + Issue);
-                            }
-                            
remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues);
-                        }
-                    }
-                }
-            } else {
-                final ProcessGroupStatus nodeClone = 
nodeProcessGroupStatus.clone();
-                for (final RemoteProcessGroupStatus remoteProcessGroupStatus : 
nodeClone.getRemoteProcessGroupStatus()) {
-                    final List<String> nodeAuthorizationIssues = 
remoteProcessGroupStatus.getAuthorizationIssues();
-                    if (!nodeAuthorizationIssues.isEmpty()) {
-                        for (final ListIterator<String> iter = 
nodeAuthorizationIssues.listIterator(); iter.hasNext();) {
-                            final String Issue = iter.next();
-                            iter.set("[" + nodeId.getApiAddress() + ":" + 
nodeId.getApiPort() + "] -- " + Issue);
-                        }
-                        
remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues);
-                    }
-                }
-
-                ProcessGroupStatus.merge(mergedProcessGroupStatus, nodeClone);
-            }
-        }
-
-        return mergedProcessGroupStatus;
-    }
-
-    private ProcessGroupStatus getProcessGroupStatus(final ProcessGroupStatus 
parent, final String groupId) {
-        if (parent.getId().equals(groupId)) {
-            return parent;
-        }
-
-        for (final ProcessGroupStatus child : parent.getProcessGroupStatus()) {
-            final ProcessGroupStatus matching = getProcessGroupStatus(child, 
groupId);
-            if (matching != null) {
-                return matching;
-            }
-        }
-
-        return null;
-    }
 
     @Override
     public SystemDiagnostics getSystemDiagnostics() {
@@ -4214,19 +4419,6 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         return new Date(time - time % numMillis);
     }
 
-    private NodeDTO createNodeDTO(final Node node) {
-        final NodeDTO nodeDto = new NodeDTO();
-        final NodeIdentifier nodeId = node.getNodeId();
-        nodeDto.setNodeId(nodeId.getId());
-        nodeDto.setAddress(nodeId.getApiAddress());
-        nodeDto.setApiPort(nodeId.getApiPort());
-        nodeDto.setStatus(node.getStatus().name());
-        nodeDto.setPrimary(node.equals(getPrimaryNode()));
-        final Date connectionRequested = new 
Date(node.getConnectionRequestedTimestamp());
-        nodeDto.setConnectionRequested(connectionRequested);
-
-        return nodeDto;
-    }
 
     private List<StatusSnapshotDTO> aggregate(Map<Date, List<StatusSnapshot>> 
snapshotsToAggregate) {
         // Aggregate the snapshots
@@ -4245,185 +4437,34 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         return aggregatedSnapshotDtos;
     }
 
-    public ClusterStatusHistoryDTO getProcessorStatusHistory(final String 
processorId) {
-        return getProcessorStatusHistory(processorId, null, null, 
Integer.MAX_VALUE);
-    }
-
-    public ClusterStatusHistoryDTO getProcessorStatusHistory(final String 
processorId, final Date startDate, final Date endDate, final int 
preferredDataPoints) {
-        final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
-
-        StatusHistoryDTO lastStatusHistory = null;
-        final Set<MetricDescriptor<?>> processorDescriptors = new 
LinkedHashSet<>();
-        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new 
TreeMap<>();
 
-        for (final Node node : getRawNodes()) {
-            final ComponentStatusRepository statusRepository = 
componentMetricsRepositoryMap.get(node.getNodeId());
-            if (statusRepository == null) {
-                continue;
-            }
-
-            final StatusHistory statusHistory = 
statusRepository.getProcessorStatusHistory(processorId, startDate, endDate, 
preferredDataPoints);
-            if (statusHistory == null) {
-                continue;
-            }
+    private StatusSnapshot createSnapshot(final StatusSnapshotDTO snapshotDto, 
final Map<String, MetricDescriptor<?>> metricDescriptors) {
+        final StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
+        snapshot.setTimestamp(snapshotDto.getTimestamp());
 
-            
processorDescriptors.addAll(statusRepository.getProcessorMetricDescriptors());
+        final Map<String, Long> metrics = snapshotDto.getStatusMetrics();
+        for (final Map.Entry<String, Long> entry : metrics.entrySet()) {
+            final String metricId = entry.getKey();
+            final Long value = entry.getValue();
 
-            // record the status history (last) to get the component details 
for use later
-            final StatusHistoryDTO statusHistoryDto = 
createStatusHistoryDto(statusHistory);
-            lastStatusHistory = statusHistoryDto;
-
-            final NodeStatusHistoryDTO nodeHistory = new 
NodeStatusHistoryDTO();
-            nodeHistory.setStatusHistory(statusHistoryDto);
-            nodeHistory.setNode(createNodeDTO(node));
-            nodeHistories.add(nodeHistory);
-
-            // collect all of the snapshots to aggregate
-            for (final StatusSnapshot snapshot : 
statusHistory.getStatusSnapshots()) {
-                final Date normalizedDate = 
normalizeStatusSnapshotDate(snapshot.getTimestamp(), 
componentStatusSnapshotMillis);
-                List<StatusSnapshot> snapshots = 
snapshotsToAggregate.get(normalizedDate);
-                if (snapshots == null) {
-                    snapshots = new ArrayList<>();
-                    snapshotsToAggregate.put(normalizedDate, snapshots);
-                }
-                snapshots.add(snapshot);
-            }
-        }
-
-        // Aggregate the snapshots
-        final List<StatusSnapshotDTO> aggregatedSnapshotDtos = 
aggregate(snapshotsToAggregate);
-
-        // get the details for this component from the last status history
-        final LinkedHashMap<String, String> clusterStatusHistoryDetails = new 
LinkedHashMap<>();
-        clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails());
-
-        final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
-        clusterStatusHistory.setGenerated(new Date());
-        
clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(processorDescriptors));
-        clusterStatusHistory.setDetails(clusterStatusHistoryDetails);
-        clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos);
-
-        final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO();
-        history.setGenerated(new Date());
-        history.setNodeStatusHistory(nodeHistories);
-        history.setClusterStatusHistory(clusterStatusHistory);
-        return history;
-    }
-
-    public StatusHistoryDTO createStatusHistoryDto(final StatusHistory 
statusHistory) {
-        final StatusHistoryDTO dto = new StatusHistoryDTO();
-
-        dto.setDetails(new 
LinkedHashMap<>(statusHistory.getComponentDetails()));
-        
dto.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(statusHistory));
-        dto.setGenerated(statusHistory.getDateGenerated());
-
-        final List<StatusSnapshotDTO> statusSnapshots = new ArrayList<>();
-        for (final StatusSnapshot statusSnapshot : 
statusHistory.getStatusSnapshots()) {
-            
statusSnapshots.add(StatusHistoryUtil.createStatusSnapshotDto(statusSnapshot));
-        }
-        dto.setStatusSnapshots(statusSnapshots);
-
-        return dto;
-    }
-
-    public ClusterStatusHistoryDTO getConnectionStatusHistory(final String 
connectionId) {
-        return getConnectionStatusHistory(connectionId, null, null, 
Integer.MAX_VALUE);
-    }
-
-    public ClusterStatusHistoryDTO getConnectionStatusHistory(final String 
connectionId, final Date startDate, final Date endDate, final int 
preferredDataPoints) {
-        final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
-
-        StatusHistoryDTO lastStatusHistory = null;
-        final Set<MetricDescriptor<?>> connectionDescriptors = new 
LinkedHashSet<>();
-        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new 
TreeMap<>();
-
-        for (final Node node : getRawNodes()) {
-            final ComponentStatusRepository statusRepository = 
componentMetricsRepositoryMap.get(node.getNodeId());
-            if (statusRepository == null) {
-                continue;
-            }
-
-            final StatusHistory statusHistory = 
statusRepository.getConnectionStatusHistory(connectionId, startDate, endDate, 
preferredDataPoints);
-            if (statusHistory == null) {
-                continue;
-            }
-
-            final StatusHistoryDTO statusHistoryDto = 
createStatusHistoryDto(statusHistory);
-            // record the status history (last) to get the componet details 
for use later
-            lastStatusHistory = statusHistoryDto;
-            
connectionDescriptors.addAll(statusRepository.getConnectionMetricDescriptors());
-
-            final NodeStatusHistoryDTO nodeHistory = new 
NodeStatusHistoryDTO();
-            nodeHistory.setStatusHistory(statusHistoryDto);
-            nodeHistory.setNode(createNodeDTO(node));
-            nodeHistories.add(nodeHistory);
-
-            // collect all of the snapshots to aggregate
-            for (final StatusSnapshot snapshot : 
statusHistory.getStatusSnapshots()) {
-                final Date normalizedDate = 
normalizeStatusSnapshotDate(snapshot.getTimestamp(), 
componentStatusSnapshotMillis);
-                List<StatusSnapshot> snapshots = 
snapshotsToAggregate.get(normalizedDate);
-                if (snapshots == null) {
-                    snapshots = new ArrayList<>();
-                    snapshotsToAggregate.put(normalizedDate, snapshots);
-                }
-                snapshots.add(snapshot);
+            final MetricDescriptor<?> descriptor = 
metricDescriptors.get(metricId);
+            if (descriptor != null) {
+                snapshot.addStatusMetric(descriptor, value);
             }
         }
 
-        // Aggregate the snapshots
-        final List<StatusSnapshotDTO> aggregatedSnapshotDtos = 
aggregate(snapshotsToAggregate);
-
-        // get the details for this component from the last status history
-        final LinkedHashMap<String, String> clusterStatusHistoryDetails = new 
LinkedHashMap<>();
-        clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails());
-
-        final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
-        clusterStatusHistory.setGenerated(new Date());
-        
clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(connectionDescriptors));
-        clusterStatusHistory.setDetails(clusterStatusHistoryDetails);
-        clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos);
-
-        final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO();
-        history.setGenerated(new Date());
-        history.setNodeStatusHistory(nodeHistories);
-        history.setClusterStatusHistory(clusterStatusHistory);
-        return history;
-    }
-
-    public ClusterStatusHistoryDTO getProcessGroupStatusHistory(final String 
processGroupId) {
-        return getProcessGroupStatusHistory(processGroupId, null, null, 
Integer.MAX_VALUE);
+        return snapshot;
     }
 
-    public ClusterStatusHistoryDTO getProcessGroupStatusHistory(final String 
processGroupId, final Date startDate, final Date endDate, final int 
preferredDataPoints) {
-        final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
-
-        StatusHistoryDTO lastStatusHistory = null;
-        final Set<MetricDescriptor<?>> processGroupDescriptors = new 
LinkedHashSet<>();
+    private List<StatusSnapshotDTO> mergeStatusHistories(final Map<String, 
List<StatusSnapshotDTO>> nodeStatusHistories, final Map<String, 
MetricDescriptor<?>> metricDescriptors) {
+        // Map of "normalized Date" (i.e., a time range, essentially) to all 
Snapshots for that time. The list
+        // will contain one snapshot for each node.
         final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new 
TreeMap<>();
 
-        for (final Node node : getRawNodes()) {
-            final ComponentStatusRepository statusRepository = 
componentMetricsRepositoryMap.get(node.getNodeId());
-            if (statusRepository == null) {
-                continue;
-            }
-
-            final StatusHistory statusHistory = 
statusRepository.getProcessGroupStatusHistory(processGroupId, startDate, 
endDate, preferredDataPoints);
-            if (statusHistory == null) {
-                continue;
-            }
-
-            final StatusHistoryDTO statusHistoryDto = 
createStatusHistoryDto(statusHistory);
-            // record the status history (last) to get the componet details 
for use later
-            lastStatusHistory = statusHistoryDto;
-            
processGroupDescriptors.addAll(statusRepository.getProcessGroupMetricDescriptors());
-
-            final NodeStatusHistoryDTO nodeHistory = new 
NodeStatusHistoryDTO();
-            nodeHistory.setStatusHistory(statusHistoryDto);
-            nodeHistory.setNode(createNodeDTO(node));
-            nodeHistories.add(nodeHistory);
-
-            // collect all of the snapshots to aggregate
-            for (final StatusSnapshot snapshot : 
statusHistory.getStatusSnapshots()) {
+        // group status snapshot's for each node by date
+        for (final List<StatusSnapshotDTO> snapshotDtos : 
nodeStatusHistories.values()) {
+            for (final StatusSnapshotDTO snapshotDto : snapshotDtos) {
+                final StatusSnapshot snapshot = createSnapshot(snapshotDto, 
metricDescriptors);
                 final Date normalizedDate = 
normalizeStatusSnapshotDate(snapshot.getTimestamp(), 
componentStatusSnapshotMillis);
                 List<StatusSnapshot> snapshots = 
snapshotsToAggregate.get(normalizedDate);
                 if (snapshots == null) {
@@ -4434,89 +4475,12 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             }
         }
 
-        // Aggregate the snapshots
-        final List<StatusSnapshotDTO> aggregatedSnapshotDtos = 
aggregate(snapshotsToAggregate);
-
-        // get the details for this component from the last status history
-        final LinkedHashMap<String, String> clusterStatusHistoryDetails = new 
LinkedHashMap<>();
-        clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails());
-
-        final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
-        clusterStatusHistory.setGenerated(new Date());
-        clusterStatusHistory.setDetails(clusterStatusHistoryDetails);
-        
clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(processGroupDescriptors));
-        clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos);
-
-        final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO();
-        history.setGenerated(new Date());
-        history.setNodeStatusHistory(nodeHistories);
-        history.setClusterStatusHistory(clusterStatusHistory);
-        return history;
+        // aggregate the snapshots by (normalized) timestamp
+        final List<StatusSnapshotDTO> aggregatedSnapshots = 
aggregate(snapshotsToAggregate);
+        return aggregatedSnapshots;
     }
 
-    public ClusterStatusHistoryDTO getRemoteProcessGroupStatusHistory(final 
String remoteGroupId) {
-        return getRemoteProcessGroupStatusHistory(remoteGroupId, null, null, 
Integer.MAX_VALUE);
-    }
-
-    public ClusterStatusHistoryDTO getRemoteProcessGroupStatusHistory(final 
String remoteGroupId, final Date startDate, final Date endDate, final int 
preferredDataPoints) {
-        final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
-
-        StatusHistoryDTO lastStatusHistory = null;
-        final Set<MetricDescriptor<?>> remoteProcessGroupDescriptors = new 
LinkedHashSet<>();
-        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new 
TreeMap<>();
-
-        for (final Node node : getRawNodes()) {
-            final ComponentStatusRepository statusRepository = 
componentMetricsRepositoryMap.get(node.getNodeId());
-            if (statusRepository == null) {
-                continue;
-            }
-
-            final StatusHistory statusHistory = 
statusRepository.getRemoteProcessGroupStatusHistory(remoteGroupId, startDate, 
endDate, preferredDataPoints);
-            if (statusHistory == null) {
-                continue;
-            }
-
-            final StatusHistoryDTO statusHistoryDto = 
createStatusHistoryDto(statusHistory);
-            // record the status history (last) to get the componet details 
for use later
-            lastStatusHistory = statusHistoryDto;
-            
remoteProcessGroupDescriptors.addAll(statusRepository.getRemoteProcessGroupMetricDescriptors());
 
-            final NodeStatusHistoryDTO nodeHistory = new 
NodeStatusHistoryDTO();
-            nodeHistory.setStatusHistory(statusHistoryDto);
-            nodeHistory.setNode(createNodeDTO(node));
-            nodeHistories.add(nodeHistory);
-
-            // collect all of the snapshots to aggregate
-            for (final StatusSnapshot snapshot : 
statusHistory.getStatusSnapshots()) {
-                final Date normalizedDate = 
normalizeStatusSnapshotDate(snapshot.getTimestamp(), 
componentStatusSnapshotMillis);
-                List<StatusSnapshot> snapshots = 
snapshotsToAggregate.get(normalizedDate);
-                if (snapshots == null) {
-                    snapshots = new ArrayList<>();
-                    snapshotsToAggregate.put(normalizedDate, snapshots);
-                }
-                snapshots.add(snapshot);
-            }
-        }
-
-        // Aggregate the snapshots
-        final List<StatusSnapshotDTO> aggregatedSnapshotDtos = 
aggregate(snapshotsToAggregate);
-
-        // get the details for this component from the last status history
-        final LinkedHashMap<String, String> clusterStatusHistoryDetails = new 
LinkedHashMap<>();
-        clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails());
-
-        final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
-        clusterStatusHistory.setGenerated(new Date());
-        clusterStatusHistory.setDetails(clusterStatusHistoryDetails);
-        
clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(remoteProcessGroupDescriptors));
-        clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos);
-
-        final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO();
-        history.setGenerated(new Date());
-        history.setNodeStatusHistory(nodeHistories);
-        history.setClusterStatusHistory(clusterStatusHistory);
-        return history;
-    }
 
     private static class ClusterManagerLock {
 
@@ -4583,41 +4547,4 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     public Set<String> getControllerServiceIdentifiers(final Class<? extends 
ControllerService> serviceType) {
         return 
controllerServiceProvider.getControllerServiceIdentifiers(serviceType);
     }
-
-    /**
-     * Captures snapshots of components' metrics
-     */
-    private class CaptureComponentMetrics implements Runnable {
-        @Override
-        public void run() {
-            readLock.lock();
-            try {
-                for (final Node node : nodes) {
-                    if (Status.CONNECTED.equals(node.getStatus())) {
-                        ComponentStatusRepository statusRepository = 
componentMetricsRepositoryMap.get(node.getNodeId());
-                        if (statusRepository == null) {
-                            statusRepository = 
createComponentStatusRepository();
-                            
componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository);
-                        }
-
-                        // ensure this node has a payload
-                        if (node.getHeartbeat() != null && 
node.getHeartbeatPayload() != null) {
-                            // if nothing has been captured or the current 
heartbeat is newer, capture it - comparing the heatbeat created timestamp
-                            // is safe since its marked as XmlTransient so 
we're assured that its based off the same clock that created the last capture 
date
-                            if (statusRepository.getLastCaptureDate() == null 
|| node.getHeartbeat().getCreatedTimestamp() > 
statusRepository.getLastCaptureDate().getTime()) {
-                                
statusRepository.capture(node.getHeartbeatPayload().getProcessGroupStatus());
-                            }
-                        }
-                    }
-                }
-            } catch (final Throwable t) {
-                logger.warn("Unable to capture component metrics from Node 
heartbeats: " + t);
-                if (logger.isDebugEnabled()) {
-                    logger.warn("", t);
-                }
-            } finally {
-                readLock.unlock("capture component metrics from node 
heartbeats");
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 6d09bf6..7ea0408 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -16,7 +16,40 @@
  */
 package org.apache.nifi.controller;
 
-import com.sun.jersey.api.client.ClientHandlerException;
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.net.ssl.SSLContext;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
 import org.apache.nifi.admin.service.AuditService;
@@ -27,16 +60,13 @@ import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
-import org.apache.nifi.cluster.BulletinsPayload;
 import org.apache.nifi.cluster.HeartbeatPayload;
 import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.Heartbeat;
-import org.apache.nifi.cluster.protocol.NodeBulletins;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.NodeProtocolSender;
 import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.connectable.Connectable;
@@ -109,7 +139,6 @@ import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.events.NodeBulletinProcessingStrategy;
 import org.apache.nifi.events.VolatileBulletinRepository;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -182,39 +211,7 @@ import 
org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.LockSupport;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.Objects.requireNonNull;
+import com.sun.jersey.api.client.ClientHandlerException;
 
 public class FlowController implements EventAccess, ControllerServiceProvider, 
ReportingTaskProvider, Heartbeater, QueueProvider {
 
@@ -306,7 +303,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     /**
      * timer to periodically send heartbeats to the cluster
      */
-    private ScheduledFuture<?> bulletinFuture;
     private ScheduledFuture<?> heartbeatGeneratorFuture;
     private ScheduledFuture<?> heartbeatSenderFuture;
 
@@ -316,8 +312,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      */
     private final AtomicReference<HeartbeatMessageGeneratorTask> 
heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null);
 
-    private final AtomicReference<NodeBulletinProcessingStrategy> 
nodeBulletinSubscriber;
-
     // guarded by rwLock
     /**
      * the node identifier;
@@ -418,7 +412,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         counterRepositoryRef = new AtomicReference<CounterRepository>(new 
StandardCounterRepository());
 
         bulletinRepository = new VolatileBulletinRepository();
-        nodeBulletinSubscriber = new AtomicReference<>();
 
         try {
             this.provenanceEventRepository = 
createProvenanceRepository(properties);
@@ -2953,8 +2946,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
             stopHeartbeating();
 
-            bulletinFuture = clusterTaskExecutor.scheduleWithFixedDelay(new 
BulletinsTask(protocolSender), 250, 2000, TimeUnit.MILLISECONDS);
-
             final HeartbeatMessageGeneratorTask heartbeatMessageGeneratorTask 
= new HeartbeatMessageGeneratorTask();
             
heartbeatMessageGeneratorTaskRef.set(heartbeatMessageGeneratorTask);
             heartbeatGeneratorFuture = 
clusterTaskExecutor.scheduleWithFixedDelay(heartbeatMessageGeneratorTask, 0, 
heartbeatDelaySeconds, TimeUnit.SECONDS);
@@ -3005,10 +2996,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             if (heartbeatSenderFuture != null) {
                 heartbeatSenderFuture.cancel(false);
             }
-
-            if (bulletinFuture != null) {
-                bulletinFuture.cancel(false);
-            }
         } finally {
             writeLock.unlock();
         }
@@ -3133,8 +3120,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             // update the bulletin repository
             if (isChanging) {
                 if (clustered) {
-                    nodeBulletinSubscriber.set(new 
NodeBulletinProcessingStrategy());
-                    
bulletinRepository.overrideDefaultBulletinProcessing(nodeBulletinSubscriber.get());
                     stateManagerProvider.enableClusterProvider();
 
                     if (zooKeeperStateServer != null) {
@@ -3173,7 +3158,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                         LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
                     }
                 } else {
-                    bulletinRepository.restoreDefaultBulletinProcessing();
                     if (zooKeeperStateServer != null) {
                         zooKeeperStateServer.shutdown();
                     }
@@ -3472,6 +3456,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         return replayFlowFile(record, requestor);
     }
 
+    @SuppressWarnings("deprecation")
     public ProvenanceEventRecord replayFlowFile(final ProvenanceEventRecord 
event, final String requestor) throws IOException {
         if (event == null) {
             throw new NullPointerException();
@@ -3627,110 +3612,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
     }
 
-    private class BulletinsTask implements Runnable {
-
-        private final NodeProtocolSender protocolSender;
-        private final DateFormat dateFormatter = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS", Locale.US);
-
-        public BulletinsTask(final NodeProtocolSender protocolSender) {
-            if (protocolSender == null) {
-                throw new IllegalArgumentException("NodeProtocolSender may not 
be null.");
-            }
-            this.protocolSender = protocolSender;
-        }
-
-        @Override
-        public void run() {
-            try {
-                final NodeBulletinsMessage message = createBulletinsMessage();
-                if (message == null) {
-                    return;
-                }
-
-                protocolSender.sendBulletins(message);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(
-                        String.format(
-                            "Sending bulletins to cluster manager at %s",
-                            dateFormatter.format(new Date())));
-                }
-
-            } catch (final UnknownServiceAddressException usae) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(usae.getMessage());
-                }
-            } catch (final Exception ex) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Failed to send bulletins to cluster manager due 
to: " + ex, ex);
-                }
-            }
-        }
-
-        private boolean isIllegalXmlChar(final char c) {
-            return c < 0x20 && c != 0x09 && c != 0x0A && c != 0x0D;
-        }
-
-        private boolean containsIllegalXmlChars(final Bulletin bulletin) {
-            final String message = bulletin.getMessage();
-            for (int i = 0; i < message.length(); i++) {
-                final char c = message.charAt(i);
-                if (isIllegalXmlChar(c)) {
-                    return true;
-                }
-            }
-
-            return false;
-        }
-
-        private String stripIllegalXmlChars(final String value) {
-            final StringBuilder sb = new StringBuilder(value.length());
-            for (int i = 0; i < value.length(); i++) {
-                final char c = value.charAt(i);
-                sb.append(isIllegalXmlChar(c) ? '?' : c);
-            }
-
-            return sb.toString();
-        }
-
-        private NodeBulletinsMessage createBulletinsMessage() {
-            final Set<Bulletin> nodeBulletins = 
nodeBulletinSubscriber.get().getBulletins();
-            final Set<Bulletin> escapedNodeBulletins = new 
HashSet<>(nodeBulletins.size());
-
-            // ensure there are some bulletins to report
-            if (nodeBulletins.isEmpty()) {
-                return null;
-            }
-
-            for (final Bulletin bulletin : nodeBulletins) {
-                final Bulletin escapedBulletin;
-                if (containsIllegalXmlChars(bulletin)) {
-                    final String escapedBulletinMessage = 
stripIllegalXmlChars(bulletin.getMessage());
-
-                    if (bulletin.getGroupId() == null) {
-                        escapedBulletin = 
BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), 
escapedBulletinMessage);
-                    } else {
-                        escapedBulletin = 
BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), 
bulletin.getSourceType(),
-                            bulletin.getSourceName(), bulletin.getCategory(), 
bulletin.getLevel(), escapedBulletinMessage);
-                    }
-                } else {
-                    escapedBulletin = bulletin;
-                }
-
-                escapedNodeBulletins.add(escapedBulletin);
-            }
-
-            // create the bulletin payload
-            final BulletinsPayload payload = new BulletinsPayload();
-            payload.setBulletins(escapedNodeBulletins);
-
-            // create bulletin message
-            final NodeBulletins bulletins = new NodeBulletins(getNodeId(), 
payload.marshal());
-            final NodeBulletinsMessage message = new NodeBulletinsMessage();
-            message.setBulletins(bulletins);
-
-            return message;
-        }
-    }
 
     private class HeartbeatSendTask implements Runnable {
 
@@ -3819,7 +3700,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
                 hbPayload.setCounters(getCounters());
                 hbPayload.setSystemDiagnostics(getSystemDiagnostics());
-                hbPayload.setProcessGroupStatus(procGroupStatus);
 
                 // create heartbeat message
                 final Heartbeat heartbeat = new Heartbeat(getNodeId(), 
bean.isPrimary(), bean.isConnected(), hbPayload.marshal());

http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java
new file mode 100644
index 0000000..8b9f383
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.status.history;
+
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
+
+public enum ConnectionStatusDescriptor {
+    INPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>(
+        "inputBytes",
+        "Bytes In (5 mins)",
+        "The cumulative size of all FlowFiles that were transferred to this 
Connection in the past 5 minutes",
+        Formatter.DATA_SIZE,
+        new ValueMapper<ConnectionStatus>() {
+            @Override
+            public Long getValue(final ConnectionStatus status) {
+                return status.getInputBytes();
+            }
+        })),
+
+    INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>(
+        "inputCount",
+        "FlowFiles In (5 mins)",
+        "The number of FlowFiles that were transferred to this Connection in 
the past 5 minutes",
+        Formatter.COUNT,
+        new ValueMapper<ConnectionStatus>() {
+            @Override
+            public Long getValue(final ConnectionStatus status) {
+                return Long.valueOf(status.getInputCount());
+            }
+        })),
+
+    OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>(
+        "outputBytes",
+        "Bytes Out (5 mins)",
+        "The cumulative size of all FlowFiles that were pulled from this 
Connection in the past 5 minutes",
+        Formatter.DATA_SIZE,
+        new ValueMapper<ConnectionStatus>() {
+            @Override
+            public Long getValue(final ConnectionStatus status) {
+                return status.getOutputBytes();
+            }
+        })),
+
+    OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>(
+        "outputCount",
+        "FlowFiles Out (5 mins)",
+        "The number of FlowFiles that were pulled from this Connection in the 
past 5 minutes",
+        Formatter.COUNT,
+        new ValueMapper<ConnectionStatus>() {
+            @Override
+            public Long getValue(final ConnectionStatus status) {
+                return Long.valueOf(status.getOutputCount());
+            }
+        })),
+
+    QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>(
+        "queuedBytes",
+        "Queued Bytes",
+        "The number of Bytes queued in this Connection",
+        Formatter.DATA_SIZE,
+        new ValueMapper<ConnectionStatus>() {
+            @Override
+            public Long getValue(final ConnectionStatus status) {
+                return status.getQueuedBytes();
+            }
+        })),
+
+    QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>(
+        "queuedCount",
+        "Queued Count",
+        "The number of FlowFiles queued in this Connection",
+        Formatter.COUNT,
+        new ValueMapper<ConnectionStatus>() {
+            @Override
+            public Long getValue(final ConnectionStatus status) {
+                return Long.valueOf(status.getQueuedCount());
+            }
+        }));
+
+
+    private MetricDescriptor<ConnectionStatus> descriptor;
+
+    private ConnectionStatusDescriptor(final 
MetricDescriptor<ConnectionStatus> descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    public String getField() {
+        return descriptor.getField();
+    }
+
+    public MetricDescriptor<ConnectionStatus> getDescriptor() {
+        return descriptor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
new file mode 100644
index 0000000..d5325d0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.status.history;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
+
+public enum ProcessGroupStatusDescriptor {
+
+    BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", 
"Bytes Read (5 mins)",
+        "The total number of bytes read from Content Repository by Processors 
in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return status.getBytesRead();
+            }
+        })),
+
+    BYTES_WRITTEN(new 
StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 
mins)",
+        "The total number of bytes written to Content Repository by Processors 
in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return status.getBytesWritten();
+            }
+        })),
+
+    BYTES_TRANSFERRED(new 
StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes 
Transferred (5 mins)",
+        "The total number of bytes read from or written to Content Repository 
by Processors in this Process Group in the past 5 minutes",
+        Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return status.getBytesRead() + status.getBytesWritten();
+            }
+        })),
+
+    INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", 
"Bytes In (5 mins)",
+        "The cumulative size of all FlowFiles that have entered this Process 
Group via its Input Ports in the past 5 minutes",
+        Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return status.getInputContentSize();
+            }
+        })),
+
+    INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount", 
"FlowFiles In (5 mins)",
+        "The number of FlowFiles that have entered this Process Group via its 
Input Ports in the past 5 minutes",
+        Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return status.getInputCount().longValue();
+            }
+        })),
+
+    OUTPUT_BYTES(new 
StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 
mins)",
+        "The cumulative size of all FlowFiles that have exited this Process 
Group via its Output Ports in the past 5 minutes",
+        Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return status.getOutputContentSize();
+            }
+        })),
+
+    OUTPUT_COUNT(new 
StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 
mins)",
+        "The number of FlowFiles that have exited this Process Group via its 
Output Ports in the past 5 minutes",
+        Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return status.getOutputCount().longValue();
+            }
+        })),
+
+    QUEUED_BYTES(new 
StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes",
+        "The cumulative size of all FlowFiles queued in all Connections of 
this Process Group",
+        Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return status.getQueuedContentSize();
+            }
+        })),
+
+    QUEUED_COUNT(new 
StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count",
+        "The number of FlowFiles queued in all Connections of this Process 
Group", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return status.getQueuedCount().longValue();
+            }
+        })),
+
+    TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", 
"Total Task Duration (5 mins)",
+        "The total number of thread-milliseconds that the Processors within 
this ProcessGroup have used to complete their tasks in the past 5 minutes",
+        Formatter.DURATION, new ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return calculateTaskMillis(status);
+            }
+        }));
+
+    private MetricDescriptor<ProcessGroupStatus> descriptor;
+
+    private ProcessGroupStatusDescriptor(final 
MetricDescriptor<ProcessGroupStatus> descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    public String getField() {
+        return descriptor.getField();
+    }
+
+    public MetricDescriptor<ProcessGroupStatus> getDescriptor() {
+        return descriptor;
+    }
+
+
+    private static long calculateTaskMillis(final ProcessGroupStatus status) {
+        long nanos = 0L;
+
+        for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
+            nanos += procStatus.getProcessingNanos();
+        }
+
+        for (final ProcessGroupStatus childStatus : 
status.getProcessGroupStatus()) {
+            nanos += calculateTaskMillis(childStatus);
+        }
+
+        return TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS);
+    }
+}

Reply via email to