http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index a67e74b..96beff5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -23,41 +23,23 @@ import com.wordnik.swagger.annotations.ApiParam;
 import com.wordnik.swagger.annotations.ApiResponse;
 import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.FormParam;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ConfigurationSnapshot;
-import org.apache.nifi.web.IllegalClusterResourceRequestException;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
-import static org.apache.nifi.web.api.ApplicationResource.CLIENT_ID;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.PositionDTO;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.nifi.web.api.entity.FlowSnippetEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
@@ -67,9 +49,33 @@ import org.apache.nifi.web.api.entity.StatusHistoryEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.DoubleParameter;
 import org.apache.nifi.web.api.request.LongParameter;
-import org.apache.commons.lang3.StringUtils;
 import org.springframework.security.access.prepost.PreAuthorize;
 
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.FormParam;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
 /**
  * RESTful endpoint for managing a Group.
  */
@@ -979,7 +985,7 @@ public class ProcessGroupResource extends 
ApplicationResource {
             headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
 
             // replicate put request
-            return (Response) clusterManager.applyRequest(HttpMethod.PUT, 
putUri, updateClientId(processGroupEntity), 
getHeaders(headersToOverride)).getResponse();
+            return clusterManager.applyRequest(HttpMethod.PUT, putUri, 
updateClientId(processGroupEntity), 
getHeaders(headersToOverride)).getResponse();
         }
 
         // handle expects request (usually from the cluster manager)
@@ -1251,7 +1257,7 @@ public class ProcessGroupResource extends 
ApplicationResource {
      * Retrieves the status report for this NiFi.
      *
      * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
-     * @param recursive Optional recursive flag that defaults to false. If set 
to true, all descendent groups and their content will be included if the 
verbose flag is also set to true.
+     * @param recursive Optional recursive flag that defaults to false. If set 
to true, all descendant groups and the status of their content will be included.
      * @return A processGroupStatusEntity.
      */
     @GET
@@ -1281,21 +1287,69 @@ public class ProcessGroupResource extends 
ApplicationResource {
             }
     )
     public Response getProcessGroupStatus(
+            @ApiParam(
+                value = "If the client id is not specified, new one will be 
generated. This value (whether specified or generated) is included in the 
response.",
+                required = false
+            )
             @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
-            @QueryParam("recursive") @DefaultValue(RECURSIVE) Boolean 
recursive) {
+            @ApiParam(
+                value = "Whether all descendant groups and the status of their 
content will be included. Optional, defaults to false",
+                required = false
+            )
+            @QueryParam("recursive") @DefaultValue(RECURSIVE) Boolean 
recursive,
+            @ApiParam(
+                value = "Whether or not to include the breakdown per node. 
Optional, defaults to false",
+                required = false
+            )
+            @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+            @ApiParam(
+                value = "The id of the node where to get the status.",
+                required = false
+            )
+            @QueryParam("clusterNodeId") String clusterNodeId) {
+
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be 
directed at a specific node.");
+        }
+
+        if (properties.isClusterManager()) {
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                final NodeResponse nodeResponse = 
clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), 
getRequestParameters(true), getHeaders());
+                final ProcessGroupStatusEntity entity = 
(ProcessGroupStatusEntity) nodeResponse.getUpdatedEntity();
+
+                // ensure there is an updated entity (result of merging) and 
prune the response as necessary
+                if (entity != null && !nodewise) {
+                    entity.getProcessGroupStatus().setNodeSnapshots(null);
+                }
+
+                return nodeResponse.getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node 
does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders(), 
targetNodes).getResponse();
+            }
+        }
 
         // get the status
         final ProcessGroupStatusDTO statusReport = 
serviceFacade.getProcessGroupStatus(groupId);
 
         // prune the response as necessary
         if (!recursive) {
-            for (final ProcessGroupStatusDTO childProcessGroupStatus : 
statusReport.getProcessGroupStatus()) {
-                childProcessGroupStatus.setConnectionStatus(null);
-                childProcessGroupStatus.setProcessGroupStatus(null);
-                childProcessGroupStatus.setInputPortStatus(null);
-                childProcessGroupStatus.setOutputPortStatus(null);
-                childProcessGroupStatus.setProcessorStatus(null);
-                childProcessGroupStatus.setRemoteProcessGroupStatus(null);
+            pruneChildGroups(statusReport.getAggregateSnapshot());
+            if (statusReport.getNodeSnapshots() != null) {
+                for (final NodeProcessGroupStatusSnapshotDTO nodeSnapshot : 
statusReport.getNodeSnapshots()) {
+                    pruneChildGroups(nodeSnapshot.getStatusSnapshot());
+                }
             }
         }
 
@@ -1312,6 +1366,17 @@ public class ProcessGroupResource extends 
ApplicationResource {
         return clusterContext(generateOkResponse(entity)).build();
     }
 
+    private void pruneChildGroups(final ProcessGroupStatusSnapshotDTO 
snapshot) {
+        for (final ProcessGroupStatusSnapshotDTO childProcessGroupStatus : 
snapshot.getProcessGroupStatusSnapshots()) {
+            childProcessGroupStatus.setConnectionStatusSnapshots(null);
+            childProcessGroupStatus.setProcessGroupStatusSnapshots(null);
+            childProcessGroupStatus.setInputPortStatusSnapshots(null);
+            childProcessGroupStatus.setOutputPortStatusSnapshots(null);
+            childProcessGroupStatus.setProcessorStatusSnapshots(null);
+            childProcessGroupStatus.setRemoteProcessGroupStatusSnapshots(null);
+        }
+    }
+
     /**
      * Retrieves the specified remote process groups status history.
      *
@@ -1345,7 +1410,7 @@ public class ProcessGroupResource extends 
ApplicationResource {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
-            throw new IllegalClusterResourceRequestException("This request is 
only supported in standalone mode.");
+            return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
         // get the specified processor status history

http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
index 9fb90b5..adede7b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
@@ -22,80 +22,77 @@ import com.wordnik.swagger.annotations.ApiParam;
 import com.wordnik.swagger.annotations.ApiResponse;
 import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.FormParam;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response;
-
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
 import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.ui.extension.UiExtension;
+import org.apache.nifi.ui.extension.UiExtensionMapping;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ConfigurationSnapshot;
-import org.apache.nifi.web.IllegalClusterResourceRequestException;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
-import static org.apache.nifi.web.api.ApplicationResource.CLIENT_ID;
-
+import org.apache.nifi.web.UiExtensionType;
 import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.PositionDTO;
 import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
 import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.nifi.web.api.entity.ComponentStateEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
 import org.apache.nifi.web.api.entity.ProcessorsEntity;
+import org.apache.nifi.web.api.entity.PropertyDescriptorEntity;
 import org.apache.nifi.web.api.entity.StatusHistoryEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.DoubleParameter;
 import org.apache.nifi.web.api.request.IntegerParameter;
 import org.apache.nifi.web.api.request.LongParameter;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.ui.extension.UiExtension;
-import org.apache.nifi.ui.extension.UiExtensionMapping;
-import org.apache.nifi.web.UiExtensionType;
-import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
-import org.apache.nifi.web.api.entity.PropertyDescriptorEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.security.access.prepost.PreAuthorize;
 
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.FormParam;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
 /**
  * RESTful endpoint for managing a Processor.
  */
 @Api(hidden = true)
 public class ProcessorResource extends ApplicationResource {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(ProcessorResource.class);
-
     private static final List<Long> POSSIBLE_RUN_DURATIONS = Arrays.asList(0L, 
25L, 50L, 100L, 250L, 500L, 1000L, 2000L);
 
     private NiFiServiceFacade serviceFacade;
@@ -328,7 +325,7 @@ public class ProcessorResource extends ApplicationResource {
             headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
 
             // replicate put request
-            return (Response) clusterManager.applyRequest(HttpMethod.PUT, 
putUri, updateClientId(processorEntity), 
getHeaders(headersToOverride)).getResponse();
+            return clusterManager.applyRequest(HttpMethod.PUT, putUri, 
updateClientId(processorEntity), getHeaders(headersToOverride)).getResponse();
 
         }
 
@@ -424,6 +421,106 @@ public class ProcessorResource extends 
ApplicationResource {
     }
 
     /**
+     * Retrieves the specified processor status.
+     *
+     * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
+     * @param id The id of the processor history to retrieve.
+     * @return A processorStatusEntity.
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+    @Path("/{id}/status")
+    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+        value = "Gets status for a processor",
+        response = ProcessorStatusEntity.class,
+        authorizations = {
+            @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+            @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete 
the request because it was invalid. The request should not be retried without 
modification."),
+            @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to 
make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could 
not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
+        }
+    )
+    public Response getProcessorStatus(
+        @ApiParam(
+            value = "If the client id is not specified, new one will be 
generated. This value (whether specified or generated) is included in the 
response.",
+            required = false
+        )
+        @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
+        @ApiParam(
+            value = "Whether or not to include the breakdown per node. 
Optional, defaults to false",
+            required = false
+        )
+        @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+        @ApiParam(
+            value = "The id of the node where to get the status.",
+            required = false
+        )
+        @QueryParam("clusterNodeId") String clusterNodeId,
+        @ApiParam(
+            value = "The processor id.",
+            required = true
+        )
+        @PathParam("id") String id) {
+
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be 
directed at a specific node.");
+        }
+
+        if (properties.isClusterManager()) {
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                final NodeResponse nodeResponse = 
clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), 
getRequestParameters(true), getHeaders());
+                final ProcessorStatusEntity entity = (ProcessorStatusEntity) 
nodeResponse.getUpdatedEntity();
+
+                // ensure there is an updated entity (result of merging) and 
prune the response as necessary
+                if (entity != null && !nodewise) {
+                    entity.getProcessorStatus().setNodeSnapshots(null);
+                }
+
+                return nodeResponse.getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node 
does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders(), 
targetNodes).getResponse();
+            }
+        }
+
+        // get the specified processor status
+        final ProcessorStatusDTO processorStatus = 
serviceFacade.getProcessorStatus(groupId, id);
+
+        // create the revision
+        final RevisionDTO revision = new RevisionDTO();
+        revision.setClientId(clientId.getClientId());
+
+        // generate the response entity
+        final ProcessorStatusEntity entity = new ProcessorStatusEntity();
+        entity.setRevision(revision);
+        entity.setProcessorStatus(processorStatus);
+
+        // generate the response
+        return clusterContext(generateOkResponse(entity)).build();
+    }
+
+    /**
      * Retrieves the specified processor status history.
      *
      * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
@@ -467,7 +564,7 @@ public class ProcessorResource extends ApplicationResource {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
-            throw new IllegalClusterResourceRequestException("This request is 
only supported in standalone mode.");
+            return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
         // get the specified processor status history

http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
index e466666..8fc6a2c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
@@ -22,64 +22,66 @@ import com.wordnik.swagger.annotations.ApiParam;
 import com.wordnik.swagger.annotations.ApiResponse;
 import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.FormParam;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ConfigurationSnapshot;
-import org.apache.nifi.web.IllegalClusterResourceRequestException;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
-import static org.apache.nifi.web.api.ApplicationResource.CLIENT_ID;
-import static org.apache.nifi.web.api.ApplicationResource.VERSION;
 import org.apache.nifi.web.api.dto.PositionDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
 import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+import org.apache.nifi.web.api.entity.ConnectionsEntity;
+import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
 import org.apache.nifi.web.api.entity.StatusHistoryEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.DoubleParameter;
 import org.apache.nifi.web.api.request.IntegerParameter;
 import org.apache.nifi.web.api.request.LongParameter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.web.api.entity.ConnectionsEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.security.access.prepost.PreAuthorize;
 
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.FormParam;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
 /**
  * RESTful endpoint for managing a Remote group.
  */
 @Api(hidden = true)
 public class RemoteProcessGroupResource extends ApplicationResource {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(RemoteProcessGroupResource.class);
-
     private static final String VERBOSE_DEFAULT_VALUE = "false";
 
     private NiFiServiceFacade serviceFacade;
@@ -257,6 +259,106 @@ public class RemoteProcessGroupResource extends 
ApplicationResource {
     }
 
     /**
+     * Retrieves the specified remote process group status.
+     *
+     * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
+     * @param id The id of the processor history to retrieve.
+     * @return A remoteProcessGroupStatusEntity.
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+    @Path("/{id}/status")
+    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+        value = "Gets status for a remote process group",
+        response = ProcessorStatusEntity.class,
+        authorizations = {
+            @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+            @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete 
the request because it was invalid. The request should not be retried without 
modification."),
+            @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to 
make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could 
not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
+        }
+    )
+    public Response getRemoteProcessGroupStatus(
+        @ApiParam(
+            value = "If the client id is not specified, new one will be 
generated. This value (whether specified or generated) is included in the 
response.",
+            required = false
+        )
+        @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
+        @ApiParam(
+            value = "Whether or not to include the breakdown per node. 
Optional, defaults to false",
+            required = false
+        )
+        @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+        @ApiParam(
+            value = "The id of the node where to get the status.",
+            required = false
+        )
+        @QueryParam("clusterNodeId") String clusterNodeId,
+        @ApiParam(
+            value = "The remote process group id.",
+            required = true
+        )
+        @PathParam("id") String id) {
+
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be 
directed at a specific node.");
+        }
+
+        if (properties.isClusterManager()) {
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                final NodeResponse nodeResponse = 
clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), 
getRequestParameters(true), getHeaders());
+                final RemoteProcessGroupStatusEntity entity = 
(RemoteProcessGroupStatusEntity) nodeResponse.getUpdatedEntity();
+
+                // ensure there is an updated entity (result of merging) and 
prune the response as necessary
+                if (entity != null && !nodewise) {
+                    
entity.getRemoteProcessGroupStatus().setNodeSnapshots(null);
+                }
+
+                return nodeResponse.getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node 
does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders(), 
targetNodes).getResponse();
+            }
+        }
+
+        // get the specified remote process group status
+        final RemoteProcessGroupStatusDTO remoteProcessGroupStatus = 
serviceFacade.getRemoteProcessGroupStatus(groupId, id);
+
+        // create the revision
+        final RevisionDTO revision = new RevisionDTO();
+        revision.setClientId(clientId.getClientId());
+
+        // generate the response entity
+        final RemoteProcessGroupStatusEntity entity = new 
RemoteProcessGroupStatusEntity();
+        entity.setRevision(revision);
+        entity.setRemoteProcessGroupStatus(remoteProcessGroupStatus);
+
+        // generate the response
+        return clusterContext(generateOkResponse(entity)).build();
+    }
+
+    /**
      * Retrieves the specified remote process groups status history.
      *
      * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
@@ -300,7 +402,7 @@ public class RemoteProcessGroupResource extends 
ApplicationResource {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
-            throw new IllegalClusterResourceRequestException("This request is 
only supported in standalone mode.");
+            return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
         // get the specified processor status history
@@ -446,7 +548,7 @@ public class RemoteProcessGroupResource extends 
ApplicationResource {
             headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
 
             // replicate put request
-            return (Response) clusterManager.applyRequest(HttpMethod.PUT, 
putUri, updateClientId(remoteProcessGroupEntity), 
getHeaders(headersToOverride)).getResponse();
+            return clusterManager.applyRequest(HttpMethod.PUT, putUri, 
updateClientId(remoteProcessGroupEntity), 
getHeaders(headersToOverride)).getResponse();
         }
 
         // handle expects request (usually from the cluster manager)

http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
index d7b77b2..802f46f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
@@ -16,12 +16,6 @@
  */
 package org.apache.nifi.web.api;
 
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.HashMap;
@@ -30,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
@@ -49,39 +44,42 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.ui.extension.UiExtension;
+import org.apache.nifi.ui.extension.UiExtensionMapping;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ConfigurationSnapshot;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
-import org.apache.nifi.web.api.dto.ComponentStateDTO;
-import org.apache.nifi.web.api.dto.RevisionDTO;
-import org.apache.nifi.web.api.entity.ComponentStateEntity;
-import org.apache.nifi.web.api.request.ClientIdParameter;
-import org.apache.nifi.web.api.request.LongParameter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.ui.extension.UiExtension;
-import org.apache.nifi.ui.extension.UiExtensionMapping;
 import org.apache.nifi.web.UiExtensionType;
-import static org.apache.nifi.web.api.ApplicationResource.CLIENT_ID;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
 import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
 import org.apache.nifi.web.api.entity.PropertyDescriptorEntity;
 import org.apache.nifi.web.api.entity.ReportingTaskEntity;
 import org.apache.nifi.web.api.entity.ReportingTasksEntity;
+import org.apache.nifi.web.api.request.ClientIdParameter;
+import org.apache.nifi.web.api.request.LongParameter;
 import org.apache.nifi.web.util.Availability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.security.access.prepost.PreAuthorize;
 
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
+
 /**
  * RESTful endpoint for managing a Reporting Task.
  */
 @Api(hidden = true)
 public class ReportingTaskResource extends ApplicationResource {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(ReportingTaskResource.class);
-
     private NiFiServiceFacade serviceFacade;
     private WebClusterManager clusterManager;
     private NiFiProperties properties;
@@ -344,7 +342,7 @@ public class ReportingTaskResource extends 
ApplicationResource {
             headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
 
             // replicate put request
-            return (Response) clusterManager.applyRequest(HttpMethod.PUT, 
putUri, updateClientId(reportingTaskEntity), 
getHeaders(headersToOverride)).getResponse();
+            return clusterManager.applyRequest(HttpMethod.PUT, putUri, 
updateClientId(reportingTaskEntity), 
getHeaders(headersToOverride)).getResponse();
         }
 
         // handle expects request (usually from the cluster manager)

http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
index 5c3c03a..1bde7bf 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
@@ -22,25 +22,31 @@ import com.wordnik.swagger.annotations.ApiParam;
 import com.wordnik.swagger.annotations.ApiResponse;
 import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
+import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity;
+import org.apache.nifi.web.api.request.ClientIdParameter;
+import org.springframework.security.access.prepost.PreAuthorize;
+
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-
-import org.apache.nifi.web.NiFiServiceFacade;
-import org.apache.nifi.web.api.dto.RevisionDTO;
-import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
-import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity;
-import org.apache.nifi.web.api.request.ClientIdParameter;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.security.access.prepost.PreAuthorize;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * RESTful endpoint for retrieving system diagnostics.
@@ -52,9 +58,10 @@ import 
org.springframework.security.access.prepost.PreAuthorize;
 )
 public class SystemDiagnosticsResource extends ApplicationResource {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(SystemDiagnosticsResource.class);
-
     private NiFiServiceFacade serviceFacade;
+    private WebClusterManager clusterManager;
+    private NiFiProperties properties;
+
 
     /**
      * Gets the system diagnostics for this NiFi instance.
@@ -87,7 +94,49 @@ public class SystemDiagnosticsResource extends 
ApplicationResource {
                     value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
                     required = false
             )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId) {
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
+            @ApiParam(
+                value = "Whether or not to include the breakdown per node. 
Optional, defaults to false",
+                required = false
+            )
+            @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+            @ApiParam(
+                value = "The id of the node where to get the status.",
+                required = false
+            )
+            @QueryParam("clusterNodeId") String clusterNodeId) {
+
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be 
directed at a specific node.");
+        }
+
+        if (properties.isClusterManager()) {
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                final NodeResponse nodeResponse = 
clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), 
getRequestParameters(true), getHeaders());
+                final SystemDiagnosticsEntity entity = 
(SystemDiagnosticsEntity) nodeResponse.getUpdatedEntity();
+
+                // ensure there is an updated entity (result of merging) and 
prune the response as necessary
+                if (entity != null && !nodewise) {
+                    entity.getSystemDiagnostics().setNodeSnapshots(null);
+                }
+
+                return nodeResponse.getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node 
does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders(), 
targetNodes).getResponse();
+            }
+        }
 
         final SystemDiagnosticsDTO systemDiagnosticsDto = 
serviceFacade.getSystemDiagnostics();
 
@@ -108,4 +157,12 @@ public class SystemDiagnosticsResource extends 
ApplicationResource {
     public void setServiceFacade(NiFiServiceFacade serviceFacade) {
         this.serviceFacade = serviceFacade;
     }
+
+    public void setClusterManager(WebClusterManager clusterManager) {
+        this.clusterManager = clusterManager;
+    }
+
+    public void setProperties(NiFiProperties properties) {
+        this.properties = properties;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 6157285..5e7a902 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -39,7 +39,6 @@ import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.WebApplicationException;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
 import org.apache.nifi.action.component.details.ComponentDetails;
 import org.apache.nifi.action.component.details.ExtensionDetails;
@@ -61,6 +60,7 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.authorization.Authority;
 import org.apache.nifi.cluster.HeartbeatPayload;
 import org.apache.nifi.cluster.event.Event;
+import org.apache.nifi.cluster.manager.StatusMerger;
 import org.apache.nifi.cluster.node.Node;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.components.AllowableValue;
@@ -145,11 +145,15 @@ import 
org.apache.nifi.web.api.dto.provenance.lineage.LineageResultsDTO;
 import org.apache.nifi.web.api.dto.provenance.lineage.ProvenanceLinkDTO;
 import org.apache.nifi.web.api.dto.provenance.lineage.ProvenanceNodeDTO;
 import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.PortStatusDTO;
+import org.apache.nifi.web.api.dto.status.PortStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.StatusDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO;
 
 public final class DtoFactory {
 
@@ -161,8 +165,6 @@ public final class DtoFactory {
         }
     };
 
-    final static int MAX_BULLETINS_PER_COMPONENT = 5;
-
     private ControllerServiceLookup controllerServiceLookup;
 
     /**
@@ -323,7 +325,7 @@ public final class DtoFactory {
         final StateMapDTO dto = new StateMapDTO();
         dto.setScope(scope.toString());
 
-        final TreeMap<String, String> sortedState = new 
TreeMap(SortedStateUtils.getKeyComparator());
+        final TreeMap<String, String> sortedState = new 
TreeMap<>(SortedStateUtils.getKeyComparator());
         final Map<String, String> state = stateMap.toMap();
         sortedState.putAll(state);
 
@@ -349,8 +351,8 @@ public final class DtoFactory {
      * @param counterDtos dtos
      * @return dto
      */
-    public CountersDTO createCountersDto(final Collection<CounterDTO> 
counterDtos) {
-        final CountersDTO dto = new CountersDTO();
+    public CountersSnapshotDTO createCountersDto(final Collection<CounterDTO> 
counterDtos) {
+        final CountersSnapshotDTO dto = new CountersSnapshotDTO();
         dto.setCounters(counterDtos);
         dto.setGenerated(new Date());
         return dto;
@@ -709,13 +711,6 @@ public final class DtoFactory {
         return copy;
     }
 
-    private String formatCount(final Integer intStatus) {
-        return intStatus == null ? "-" : FormatUtils.formatCount(intStatus);
-    }
-
-    private String formatDataSize(final Long longStatus) {
-        return longStatus == null ? "-" : 
FormatUtils.formatDataSize(longStatus);
-    }
 
     public RemoteProcessGroupStatusDTO createRemoteProcessGroupStatusDto(final 
RemoteProcessGroupStatus remoteProcessGroupStatus) {
         final RemoteProcessGroupStatusDTO dto = new 
RemoteProcessGroupStatusDTO();
@@ -724,172 +719,120 @@ public final class DtoFactory {
         dto.setTargetUri(remoteProcessGroupStatus.getTargetUri());
         dto.setName(remoteProcessGroupStatus.getName());
         
dto.setTransmissionStatus(remoteProcessGroupStatus.getTransmissionStatus().toString());
-        
dto.setActiveThreadCount(remoteProcessGroupStatus.getActiveThreadCount());
-        dto.setSent(formatCount(remoteProcessGroupStatus.getSentCount()) + " / 
" + formatDataSize(remoteProcessGroupStatus.getSentContentSize()));
-        
dto.setReceived(formatCount(remoteProcessGroupStatus.getReceivedCount()) + " / 
" + formatDataSize(remoteProcessGroupStatus.getReceivedContentSize()));
-        
dto.setAuthorizationIssues(remoteProcessGroupStatus.getAuthorizationIssues());
+        dto.setStatsLastRefreshed(new Date());
+
+        final RemoteProcessGroupStatusSnapshotDTO snapshot = new 
RemoteProcessGroupStatusSnapshotDTO();
+        dto.setAggregateSnapshot(snapshot);
 
+        snapshot.setId(remoteProcessGroupStatus.getId());
+        snapshot.setGroupId(remoteProcessGroupStatus.getGroupId());
+        snapshot.setName(remoteProcessGroupStatus.getName());
+        snapshot.setTargetUri(remoteProcessGroupStatus.getTargetUri());
+        
snapshot.setTransmissionStatus(remoteProcessGroupStatus.getTransmissionStatus().toString());
+
+        
snapshot.setActiveThreadCount(remoteProcessGroupStatus.getActiveThreadCount());
+        snapshot.setFlowFilesSent(remoteProcessGroupStatus.getSentCount());
+        snapshot.setBytesSent(remoteProcessGroupStatus.getSentContentSize());
+        
snapshot.setFlowFilesReceived(remoteProcessGroupStatus.getReceivedCount());
+        
snapshot.setBytesReceived(remoteProcessGroupStatus.getReceivedContentSize());
+        
snapshot.setAuthorizationIssues(remoteProcessGroupStatus.getAuthorizationIssues());
+
+        StatusMerger.updatePrettyPrintedFields(snapshot);
         return dto;
     }
 
     public ProcessGroupStatusDTO createProcessGroupStatusDto(final 
BulletinRepository bulletinRepository, final ProcessGroupStatus 
processGroupStatus) {
-
         final ProcessGroupStatusDTO processGroupStatusDto = new 
ProcessGroupStatusDTO();
         processGroupStatusDto.setId(processGroupStatus.getId());
         processGroupStatusDto.setName(processGroupStatus.getName());
-        processGroupStatusDto.setStatsLastRefreshed(new 
Date(processGroupStatus.getCreationTimestamp()));
-        
processGroupStatusDto.setRead(formatDataSize(processGroupStatus.getBytesRead()));
-        
processGroupStatusDto.setWritten(formatDataSize(processGroupStatus.getBytesWritten()));
-        
processGroupStatusDto.setInput(formatCount(processGroupStatus.getInputCount()) 
+ " / " + formatDataSize(processGroupStatus.getInputContentSize()));
-        
processGroupStatusDto.setOutput(formatCount(processGroupStatus.getOutputCount())
 + " / " + formatDataSize(processGroupStatus.getOutputContentSize()));
-        
processGroupStatusDto.setTransferred(formatCount(processGroupStatus.getFlowFilesTransferred())
 + " / " + formatDataSize(processGroupStatus.getBytesTransferred()));
-        
processGroupStatusDto.setSent(formatCount(processGroupStatus.getFlowFilesSent())
 + " / " + formatDataSize(processGroupStatus.getBytesSent()));
-        
processGroupStatusDto.setReceived(formatCount(processGroupStatus.getFlowFilesReceived())
 + " / " + formatDataSize(processGroupStatus.getBytesReceived()));
-        
processGroupStatusDto.setActiveThreadCount(processGroupStatus.getActiveThreadCount());
-
-        final String queuedCount = 
FormatUtils.formatCount(processGroupStatus.getQueuedCount());
-        final String queuedSize = 
FormatUtils.formatDataSize(processGroupStatus.getQueuedContentSize());
-        processGroupStatusDto.setQueuedCount(queuedCount);
-        processGroupStatusDto.setQueuedSize(queuedSize);
-        processGroupStatusDto.setQueued(queuedCount + " / " + queuedSize);
-
-        final Map<String, StatusDTO> componentStatusDtoMap = new HashMap<>();
+        processGroupStatusDto.setStatsLastRefreshed(new Date());
+
+        final ProcessGroupStatusSnapshotDTO snapshot = new 
ProcessGroupStatusSnapshotDTO();
+        processGroupStatusDto.setAggregateSnapshot(snapshot);
+
+        snapshot.setId(processGroupStatus.getId());
+        snapshot.setName(processGroupStatus.getName());
+
+        snapshot.setFlowFilesQueued(processGroupStatus.getQueuedCount());
+        snapshot.setBytesQueued(processGroupStatus.getQueuedContentSize());
+        snapshot.setBytesRead(processGroupStatus.getBytesRead());
+        snapshot.setBytesWritten(processGroupStatus.getBytesWritten());
+        snapshot.setFlowFilesIn(processGroupStatus.getInputCount());
+        snapshot.setBytesIn(processGroupStatus.getInputContentSize());
+        snapshot.setFlowFilesOut(processGroupStatus.getOutputCount());
+        snapshot.setBytesOut(processGroupStatus.getOutputContentSize());
+        
snapshot.setFlowFilesTransferred(processGroupStatus.getFlowFilesTransferred());
+        snapshot.setBytesTransferred(processGroupStatus.getBytesTransferred());
+        snapshot.setFlowFilesSent(processGroupStatus.getFlowFilesSent());
+        snapshot.setBytesSent(processGroupStatus.getBytesSent());
+        
snapshot.setFlowFilesReceived(processGroupStatus.getFlowFilesReceived());
+        snapshot.setBytesReceived(processGroupStatus.getBytesReceived());
+        
snapshot.setActiveThreadCount(processGroupStatus.getActiveThreadCount());
+        StatusMerger.updatePrettyPrintedFields(snapshot);
 
         // processor status
-        final Collection<ProcessorStatusDTO> processorStatDtoCollection = new 
ArrayList<>();
-        processGroupStatusDto.setProcessorStatus(processorStatDtoCollection);
+        final Collection<ProcessorStatusSnapshotDTO> 
processorStatDtoCollection = new ArrayList<>();
+        snapshot.setProcessorStatusSnapshots(processorStatDtoCollection);
         final Collection<ProcessorStatus> processorStatusCollection = 
processGroupStatus.getProcessorStatus();
         if (processorStatusCollection != null) {
             for (final ProcessorStatus processorStatus : 
processorStatusCollection) {
                 final ProcessorStatusDTO processorStatusDto = 
createProcessorStatusDto(processorStatus);
-                processorStatDtoCollection.add(processorStatusDto);
-                componentStatusDtoMap.put(processorStatusDto.getId(), 
processorStatusDto);
+                
processorStatDtoCollection.add(processorStatusDto.getAggregateSnapshot());
             }
         }
 
         // connection status
-        final Collection<ConnectionStatusDTO> connectionStatusDtoCollection = 
new ArrayList<>();
-        
processGroupStatusDto.setConnectionStatus(connectionStatusDtoCollection);
+        final Collection<ConnectionStatusSnapshotDTO> 
connectionStatusDtoCollection = new ArrayList<>();
+        snapshot.setConnectionStatusSnapshots(connectionStatusDtoCollection);
         final Collection<ConnectionStatus> connectionStatusCollection = 
processGroupStatus.getConnectionStatus();
         if (connectionStatusCollection != null) {
             for (final ConnectionStatus connectionStatus : 
connectionStatusCollection) {
                 final ConnectionStatusDTO connectionStatusDto = 
createConnectionStatusDto(connectionStatus);
-                connectionStatusDtoCollection.add(connectionStatusDto);
+                
connectionStatusDtoCollection.add(connectionStatusDto.getAggregateSnapshot());
             }
         }
 
         // local child process groups
-        final Collection<ProcessGroupStatusDTO> 
childProcessGroupStatusDtoCollection = new ArrayList<>();
-        
processGroupStatusDto.setProcessGroupStatus(childProcessGroupStatusDtoCollection);
+        final Collection<ProcessGroupStatusSnapshotDTO> 
childProcessGroupStatusDtoCollection = new ArrayList<>();
+        
snapshot.setProcessGroupStatusSnapshots(childProcessGroupStatusDtoCollection);
         final Collection<ProcessGroupStatus> childProcessGroupStatusCollection 
= processGroupStatus.getProcessGroupStatus();
         if (childProcessGroupStatusCollection != null) {
             for (final ProcessGroupStatus childProcessGroupStatus : 
childProcessGroupStatusCollection) {
                 final ProcessGroupStatusDTO childProcessGroupStatusDto = 
createProcessGroupStatusDto(bulletinRepository, childProcessGroupStatus);
-                
childProcessGroupStatusDtoCollection.add(childProcessGroupStatusDto);
+                
childProcessGroupStatusDtoCollection.add(childProcessGroupStatusDto.getAggregateSnapshot());
             }
         }
 
         // remote child process groups
-        final Collection<RemoteProcessGroupStatusDTO> 
childRemoteProcessGroupStatusDtoCollection = new ArrayList<>();
-        
processGroupStatusDto.setRemoteProcessGroupStatus(childRemoteProcessGroupStatusDtoCollection);
+        final Collection<RemoteProcessGroupStatusSnapshotDTO> 
childRemoteProcessGroupStatusDtoCollection = new ArrayList<>();
+        
snapshot.setRemoteProcessGroupStatusSnapshots(childRemoteProcessGroupStatusDtoCollection);
         final Collection<RemoteProcessGroupStatus> 
childRemoteProcessGroupStatusCollection = 
processGroupStatus.getRemoteProcessGroupStatus();
         if (childRemoteProcessGroupStatusCollection != null) {
             for (final RemoteProcessGroupStatus childRemoteProcessGroupStatus 
: childRemoteProcessGroupStatusCollection) {
                 final RemoteProcessGroupStatusDTO 
childRemoteProcessGroupStatusDto = 
createRemoteProcessGroupStatusDto(childRemoteProcessGroupStatus);
-                
childRemoteProcessGroupStatusDtoCollection.add(childRemoteProcessGroupStatusDto);
-                
componentStatusDtoMap.put(childRemoteProcessGroupStatusDto.getId(), 
childRemoteProcessGroupStatusDto);
+                
childRemoteProcessGroupStatusDtoCollection.add(childRemoteProcessGroupStatusDto.getAggregateSnapshot());
             }
         }
 
         // input ports
-        final Collection<PortStatusDTO> inputPortStatusDtoCollection = new 
ArrayList<>();
-        processGroupStatusDto.setInputPortStatus(inputPortStatusDtoCollection);
+        final Collection<PortStatusSnapshotDTO> inputPortStatusDtoCollection = 
new ArrayList<>();
+        snapshot.setInputPortStatusSnapshots(inputPortStatusDtoCollection);
         final Collection<PortStatus> inputPortStatusCollection = 
processGroupStatus.getInputPortStatus();
         if (inputPortStatusCollection != null) {
             for (final PortStatus portStatus : inputPortStatusCollection) {
                 final PortStatusDTO portStatusDto = 
createPortStatusDto(portStatus);
-                inputPortStatusDtoCollection.add(portStatusDto);
-                componentStatusDtoMap.put(portStatusDto.getId(), 
portStatusDto);
+                
inputPortStatusDtoCollection.add(portStatusDto.getAggregateSnapshot());
             }
         }
 
         // output ports
-        final Collection<PortStatusDTO> outputPortStatusDtoCollection = new 
ArrayList<>();
-        
processGroupStatusDto.setOutputPortStatus(outputPortStatusDtoCollection);
+        final Collection<PortStatusSnapshotDTO> outputPortStatusDtoCollection 
= new ArrayList<>();
+        snapshot.setOutputPortStatusSnapshots(outputPortStatusDtoCollection);
         final Collection<PortStatus> outputPortStatusCollection = 
processGroupStatus.getOutputPortStatus();
         if (outputPortStatusCollection != null) {
             for (final PortStatus portStatus : outputPortStatusCollection) {
                 final PortStatusDTO portStatusDto = 
createPortStatusDto(portStatus);
-                outputPortStatusDtoCollection.add(portStatusDto);
-                componentStatusDtoMap.put(portStatusDto.getId(), 
portStatusDto);
-            }
-        }
-
-        // get the bulletins for this group and associate with the specific 
child component
-        if (bulletinRepository != null) {
-            if (processGroupStatusDto.getBulletins() == null) {
-                processGroupStatusDto.setBulletins(new 
ArrayList<BulletinDTO>());
-            }
-
-            // locate bulletins for this process group
-            final List<Bulletin> results = 
bulletinRepository.findBulletinsForGroupBySource(processGroupStatus.getId(), 
MAX_BULLETINS_PER_COMPONENT);
-            for (final Bulletin bulletin : results) {
-                final StatusDTO status = 
componentStatusDtoMap.get(bulletin.getSourceId());
-
-                // ensure this connectable is still in the flow
-                if (status != null) {
-                    if (status.getBulletins() == null) {
-                        status.setBulletins(new ArrayList<BulletinDTO>());
-                    }
-
-                    // convert the result into a dto
-                    final BulletinDTO bulletinDto = 
createBulletinDto(bulletin);
-                    status.getBulletins().add(bulletinDto);
-
-                    // create a copy for the parent group
-                    final BulletinDTO copy = copy(bulletinDto);
-                    copy.setGroupId(StringUtils.EMPTY);
-                    copy.setSourceId(processGroupStatus.getId());
-                    copy.setSourceName(processGroupStatus.getName());
-                    processGroupStatusDto.getBulletins().add(copy);
-                }
-            }
-
-            // copy over descendant bulletins
-            for (final ProcessGroupStatusDTO childProcessGroupStatusDto : 
processGroupStatusDto.getProcessGroupStatus()) {
-                if (childProcessGroupStatusDto.getBulletins() != null) {
-                    for (final BulletinDTO descendantBulletinDto : 
childProcessGroupStatusDto.getBulletins()) {
-                        // create a copy for the parent group
-                        final BulletinDTO copy = copy(descendantBulletinDto);
-                        copy.setGroupId(StringUtils.EMPTY);
-                        copy.setSourceId(processGroupStatus.getId());
-                        copy.setSourceName(processGroupStatus.getName());
-                        processGroupStatusDto.getBulletins().add(copy);
-                    }
-                }
-            }
-
-            // sort the bulletins
-            Collections.sort(processGroupStatusDto.getBulletins(), new 
Comparator<BulletinDTO>() {
-                @Override
-                public int compare(BulletinDTO o1, BulletinDTO o2) {
-                    if (o1 == null && o2 == null) {
-                        return 0;
-                    }
-                    if (o1 == null) {
-                        return 1;
-                    }
-                    if (o2 == null) {
-                        return -1;
-                    }
-
-                    return -Long.compare(o1.getId(), o2.getId());
-                }
-            });
-
-            // prune the response to only include the max number of bulletins
-            if (processGroupStatusDto.getBulletins().size() > 
MAX_BULLETINS_PER_COMPONENT) {
-                
processGroupStatusDto.setBulletins(processGroupStatusDto.getBulletins().subList(0,
 MAX_BULLETINS_PER_COMPONENT));
+                
outputPortStatusDtoCollection.add(portStatusDto.getAggregateSnapshot());
             }
         }
 
@@ -897,7 +840,6 @@ public final class DtoFactory {
     }
 
     public ConnectionStatusDTO createConnectionStatusDto(final 
ConnectionStatus connectionStatus) {
-
         final ConnectionStatusDTO connectionStatusDto = new 
ConnectionStatusDTO();
         connectionStatusDto.setGroupId(connectionStatus.getGroupId());
         connectionStatusDto.setId(connectionStatus.getId());
@@ -906,54 +848,64 @@ public final class DtoFactory {
         connectionStatusDto.setSourceName(connectionStatus.getSourceName());
         
connectionStatusDto.setDestinationId(connectionStatus.getDestinationId());
         
connectionStatusDto.setDestinationName(connectionStatus.getDestinationName());
+        connectionStatusDto.setStatsLastRefreshed(new Date());
+
+        final ConnectionStatusSnapshotDTO snapshot = new 
ConnectionStatusSnapshotDTO();
+        connectionStatusDto.setAggregateSnapshot(snapshot);
+
+        snapshot.setId(connectionStatus.getId());
+        snapshot.setGroupId(connectionStatus.getGroupId());
+        snapshot.setName(connectionStatus.getName());
+        snapshot.setSourceName(connectionStatus.getSourceName());
+        snapshot.setDestinationName(connectionStatus.getDestinationName());
 
-        final String queuedCount = 
FormatUtils.formatCount(connectionStatus.getQueuedCount());
-        final String queuedSize = 
FormatUtils.formatDataSize(connectionStatus.getQueuedBytes());
-        connectionStatusDto.setQueuedCount(queuedCount);
-        connectionStatusDto.setQueuedSize(queuedSize);
-        connectionStatusDto.setQueued(queuedCount + " / " + queuedSize);
+        snapshot.setFlowFilesQueued(connectionStatus.getQueuedCount());
+        snapshot.setBytesQueued(connectionStatus.getQueuedBytes());
 
-        final int inputCount = connectionStatus.getInputCount();
-        final long inputBytes = connectionStatus.getInputBytes();
-        connectionStatusDto.setInput(FormatUtils.formatCount(inputCount) + " / 
" + FormatUtils.formatDataSize(inputBytes));
+        snapshot.setFlowFilesIn(connectionStatus.getInputCount());
+        snapshot.setBytesIn(connectionStatus.getInputBytes());
 
-        final int outputCount = connectionStatus.getOutputCount();
-        final long outputBytes = connectionStatus.getOutputBytes();
-        connectionStatusDto.setOutput(FormatUtils.formatCount(outputCount) + " 
/ " + FormatUtils.formatDataSize(outputBytes));
+        snapshot.setFlowFilesOut(connectionStatus.getOutputCount());
+        snapshot.setBytesOut(connectionStatus.getOutputBytes());
+        StatusMerger.updatePrettyPrintedFields(snapshot);
 
         return connectionStatusDto;
     }
 
     public ProcessorStatusDTO createProcessorStatusDto(final ProcessorStatus 
procStatus) {
-
         final ProcessorStatusDTO dto = new ProcessorStatusDTO();
         dto.setId(procStatus.getId());
         dto.setGroupId(procStatus.getGroupId());
         dto.setName(procStatus.getName());
+        dto.setStatsLastRefreshed(new Date());
+
+        final ProcessorStatusSnapshotDTO snapshot = new 
ProcessorStatusSnapshotDTO();
+        dto.setAggregateSnapshot(snapshot);
 
-        final int processedCount = procStatus.getOutputCount();
-        final long numProcessedBytes = procStatus.getOutputBytes();
-        dto.setOutput(FormatUtils.formatCount(processedCount) + " / " + 
FormatUtils.formatDataSize(numProcessedBytes));
+        snapshot.setId(procStatus.getId());
+        snapshot.setGroupId(procStatus.getGroupId());
+        snapshot.setName(procStatus.getName());
 
-        final int inputCount = procStatus.getInputCount();
-        final long inputBytes = procStatus.getInputBytes();
-        dto.setInput(FormatUtils.formatCount(inputCount) + " / " + 
FormatUtils.formatDataSize(inputBytes));
+        snapshot.setFlowFilesOut(procStatus.getOutputCount());
+        snapshot.setBytesOut(procStatus.getOutputBytes());
 
-        final long readBytes = procStatus.getBytesRead();
-        dto.setRead(FormatUtils.formatDataSize(readBytes));
+        snapshot.setFlowFilesIn(procStatus.getInputCount());
+        snapshot.setBytesIn(procStatus.getInputBytes());
 
-        final long writtenBytes = procStatus.getBytesWritten();
-        dto.setWritten(FormatUtils.formatDataSize(writtenBytes));
+        snapshot.setBytesRead(procStatus.getBytesRead());
+        snapshot.setBytesWritten(procStatus.getBytesWritten());
 
-        
dto.setTasksDuration(FormatUtils.formatHoursMinutesSeconds(procStatus.getProcessingNanos(),
 TimeUnit.NANOSECONDS));
-        dto.setTasks(FormatUtils.formatCount(procStatus.getInvocations()));
+        snapshot.setTaskCount(procStatus.getInvocations());
+        snapshot.setTasksDurationNanos(procStatus.getProcessingNanos());
+        
snapshot.setTasksDuration(FormatUtils.formatHoursMinutesSeconds(procStatus.getProcessingNanos(),
 TimeUnit.NANOSECONDS));
 
         // determine the run status
-        dto.setRunStatus(procStatus.getRunStatus().toString());
+        snapshot.setRunStatus(procStatus.getRunStatus().toString());
 
-        dto.setActiveThreadCount(procStatus.getActiveThreadCount());
-        dto.setType(procStatus.getType());
+        snapshot.setActiveThreadCount(procStatus.getActiveThreadCount());
+        snapshot.setType(procStatus.getType());
 
+        StatusMerger.updatePrettyPrintedFields(snapshot);
         return dto;
     }
 
@@ -968,17 +920,25 @@ public final class DtoFactory {
         dto.setId(portStatus.getId());
         dto.setGroupId(portStatus.getGroupId());
         dto.setName(portStatus.getName());
-        dto.setActiveThreadCount(portStatus.getActiveThreadCount());
         dto.setRunStatus(portStatus.getRunStatus().toString());
         dto.setTransmitting(portStatus.isTransmitting());
+        dto.setStatsLastRefreshed(new Date());
 
-        final int processedCount = portStatus.getOutputCount();
-        final long numProcessedBytes = portStatus.getOutputBytes();
-        dto.setOutput(FormatUtils.formatCount(processedCount) + " / " + 
FormatUtils.formatDataSize(numProcessedBytes));
+        final PortStatusSnapshotDTO snapshot = new PortStatusSnapshotDTO();
+        dto.setAggregateSnapshot(snapshot);
 
-        final int inputCount = portStatus.getInputCount();
-        final long inputBytes = portStatus.getInputBytes();
-        dto.setInput(FormatUtils.formatCount(inputCount) + " / " + 
FormatUtils.formatDataSize(inputBytes));
+        snapshot.setId(portStatus.getId());
+        snapshot.setGroupId(portStatus.getGroupId());
+        snapshot.setName(portStatus.getName());
+        snapshot.setRunStatus(portStatus.getRunStatus().toString());
+
+        snapshot.setActiveThreadCount(portStatus.getActiveThreadCount());
+        snapshot.setFlowFilesOut(portStatus.getOutputCount());
+        snapshot.setBytesOut(portStatus.getOutputBytes());
+
+        snapshot.setFlowFilesIn(portStatus.getInputCount());
+        snapshot.setBytesIn(portStatus.getInputBytes());
+        StatusMerger.updatePrettyPrintedFields(snapshot);
 
         return dto;
     }
@@ -1766,6 +1726,7 @@ public final class DtoFactory {
      * @param node node
      * @return dto
      */
+    @SuppressWarnings("deprecation")
     public ProvenanceNodeDTO createProvenanceEventNodeDTO(final 
ProvenanceEventLineageNode node) {
         final ProvenanceNodeDTO dto = new ProvenanceNodeDTO();
         dto.setId(node.getIdentifier());
@@ -1786,6 +1747,7 @@ public final class DtoFactory {
      * @param node node
      * @return dto
      */
+    @SuppressWarnings("deprecation")
     public ProvenanceNodeDTO createFlowFileNodeDTO(final LineageNode node) {
         final ProvenanceNodeDTO dto = new ProvenanceNodeDTO();
         dto.setId(node.getIdentifier());
@@ -1906,48 +1868,59 @@ public final class DtoFactory {
     public SystemDiagnosticsDTO createSystemDiagnosticsDto(final 
SystemDiagnostics sysDiagnostics) {
 
         final SystemDiagnosticsDTO dto = new SystemDiagnosticsDTO();
-        dto.setStatsLastRefreshed(new 
Date(sysDiagnostics.getCreationTimestamp()));
+        final SystemDiagnosticsSnapshotDTO snapshot = new 
SystemDiagnosticsSnapshotDTO();
+        dto.setAggregateSnapshot(snapshot);
+
+        snapshot.setStatsLastRefreshed(new 
Date(sysDiagnostics.getCreationTimestamp()));
 
         // processors
-        dto.setAvailableProcessors(sysDiagnostics.getAvailableProcessors());
-        dto.setProcessorLoadAverage(sysDiagnostics.getProcessorLoadAverage());
+        
snapshot.setAvailableProcessors(sysDiagnostics.getAvailableProcessors());
+        
snapshot.setProcessorLoadAverage(sysDiagnostics.getProcessorLoadAverage());
 
         // threads
-        dto.setDaemonThreads(sysDiagnostics.getDaemonThreads());
-        dto.setTotalThreads(sysDiagnostics.getTotalThreads());
+        snapshot.setDaemonThreads(sysDiagnostics.getDaemonThreads());
+        snapshot.setTotalThreads(sysDiagnostics.getTotalThreads());
 
         // heap
-        
dto.setMaxHeap(FormatUtils.formatDataSize(sysDiagnostics.getMaxHeap()));
-        
dto.setTotalHeap(FormatUtils.formatDataSize(sysDiagnostics.getTotalHeap()));
-        
dto.setUsedHeap(FormatUtils.formatDataSize(sysDiagnostics.getUsedHeap()));
-        
dto.setFreeHeap(FormatUtils.formatDataSize(sysDiagnostics.getFreeHeap()));
+        
snapshot.setMaxHeap(FormatUtils.formatDataSize(sysDiagnostics.getMaxHeap()));
+        snapshot.setMaxHeapBytes(sysDiagnostics.getMaxHeap());
+        
snapshot.setTotalHeap(FormatUtils.formatDataSize(sysDiagnostics.getTotalHeap()));
+        snapshot.setTotalHeapBytes(sysDiagnostics.getTotalHeap());
+        
snapshot.setUsedHeap(FormatUtils.formatDataSize(sysDiagnostics.getUsedHeap()));
+        snapshot.setUsedHeapBytes(sysDiagnostics.getUsedHeap());
+        
snapshot.setFreeHeap(FormatUtils.formatDataSize(sysDiagnostics.getFreeHeap()));
+        snapshot.setFreeHeapBytes(sysDiagnostics.getFreeHeap());
         if (sysDiagnostics.getHeapUtilization() != -1) {
-            
dto.setHeapUtilization(FormatUtils.formatUtilization(sysDiagnostics.getHeapUtilization()));
+            
snapshot.setHeapUtilization(FormatUtils.formatUtilization(sysDiagnostics.getHeapUtilization()));
         }
 
         // non heap
-        
dto.setMaxNonHeap(FormatUtils.formatDataSize(sysDiagnostics.getMaxNonHeap()));
-        
dto.setTotalNonHeap(FormatUtils.formatDataSize(sysDiagnostics.getTotalNonHeap()));
-        
dto.setUsedNonHeap(FormatUtils.formatDataSize(sysDiagnostics.getUsedNonHeap()));
-        
dto.setFreeNonHeap(FormatUtils.formatDataSize(sysDiagnostics.getFreeNonHeap()));
+        
snapshot.setMaxNonHeap(FormatUtils.formatDataSize(sysDiagnostics.getMaxNonHeap()));
+        snapshot.setMaxNonHeapBytes(sysDiagnostics.getMaxNonHeap());
+        
snapshot.setTotalNonHeap(FormatUtils.formatDataSize(sysDiagnostics.getTotalNonHeap()));
+        snapshot.setTotalNonHeapBytes(sysDiagnostics.getTotalNonHeap());
+        
snapshot.setUsedNonHeap(FormatUtils.formatDataSize(sysDiagnostics.getUsedNonHeap()));
+        snapshot.setUsedNonHeapBytes(sysDiagnostics.getUsedNonHeap());
+        
snapshot.setFreeNonHeap(FormatUtils.formatDataSize(sysDiagnostics.getFreeNonHeap()));
+        snapshot.setFreeNonHeapBytes(sysDiagnostics.getFreeNonHeap());
         if (sysDiagnostics.getNonHeapUtilization() != -1) {
-            
dto.setNonHeapUtilization(FormatUtils.formatUtilization(sysDiagnostics.getNonHeapUtilization()));
+            
snapshot.setNonHeapUtilization(FormatUtils.formatUtilization(sysDiagnostics.getNonHeapUtilization()));
         }
 
         // flow file disk usage
-        final SystemDiagnosticsDTO.StorageUsageDTO 
flowFileRepositoryStorageUsageDto = createStorageUsageDTO(null, 
sysDiagnostics.getFlowFileRepositoryStorageUsage());
-        
dto.setFlowFileRepositoryStorageUsage(flowFileRepositoryStorageUsageDto);
+        final SystemDiagnosticsSnapshotDTO.StorageUsageDTO 
flowFileRepositoryStorageUsageDto = createStorageUsageDTO(null, 
sysDiagnostics.getFlowFileRepositoryStorageUsage());
+        
snapshot.setFlowFileRepositoryStorageUsage(flowFileRepositoryStorageUsageDto);
 
         // content disk usage
-        final Set<SystemDiagnosticsDTO.StorageUsageDTO> 
contentRepositoryStorageUsageDtos = new LinkedHashSet<>();
-        
dto.setContentRepositoryStorageUsage(contentRepositoryStorageUsageDtos);
+        final Set<SystemDiagnosticsSnapshotDTO.StorageUsageDTO> 
contentRepositoryStorageUsageDtos = new LinkedHashSet<>();
+        
snapshot.setContentRepositoryStorageUsage(contentRepositoryStorageUsageDtos);
         for (final Map.Entry<String, StorageUsage> entry : 
sysDiagnostics.getContentRepositoryStorageUsage().entrySet()) {
             
contentRepositoryStorageUsageDtos.add(createStorageUsageDTO(entry.getKey(), 
entry.getValue()));
         }
 
         // garbage collection
-        final Set<SystemDiagnosticsDTO.GarbageCollectionDTO> 
garbageCollectionDtos = new LinkedHashSet<>();
-        dto.setGarbageCollection(garbageCollectionDtos);
+        final Set<SystemDiagnosticsSnapshotDTO.GarbageCollectionDTO> 
garbageCollectionDtos = new LinkedHashSet<>();
+        snapshot.setGarbageCollection(garbageCollectionDtos);
         for (final Map.Entry<String, GarbageCollection> entry : 
sysDiagnostics.getGarbageCollection().entrySet()) {
             
garbageCollectionDtos.add(createGarbageCollectionDTO(entry.getKey(), 
entry.getValue()));
         }
@@ -1962,8 +1935,8 @@ public final class DtoFactory {
      * @param storageUsage usage
      * @return dto
      */
-    public SystemDiagnosticsDTO.StorageUsageDTO createStorageUsageDTO(final 
String identifier, final StorageUsage storageUsage) {
-        final SystemDiagnosticsDTO.StorageUsageDTO dto = new 
SystemDiagnosticsDTO.StorageUsageDTO();
+    public SystemDiagnosticsSnapshotDTO.StorageUsageDTO 
createStorageUsageDTO(final String identifier, final StorageUsage storageUsage) 
{
+        final SystemDiagnosticsSnapshotDTO.StorageUsageDTO dto = new 
SystemDiagnosticsSnapshotDTO.StorageUsageDTO();
         dto.setIdentifier(identifier);
         
dto.setFreeSpace(FormatUtils.formatDataSize(storageUsage.getFreeSpace()));
         
dto.setTotalSpace(FormatUtils.formatDataSize(storageUsage.getTotalSpace()));
@@ -1982,11 +1955,12 @@ public final class DtoFactory {
      * @param garbageCollection gc
      * @return dto
      */
-    public SystemDiagnosticsDTO.GarbageCollectionDTO 
createGarbageCollectionDTO(final String name, final GarbageCollection 
garbageCollection) {
-        final SystemDiagnosticsDTO.GarbageCollectionDTO dto = new 
SystemDiagnosticsDTO.GarbageCollectionDTO();
+    public SystemDiagnosticsSnapshotDTO.GarbageCollectionDTO 
createGarbageCollectionDTO(final String name, final GarbageCollection 
garbageCollection) {
+        final SystemDiagnosticsSnapshotDTO.GarbageCollectionDTO dto = new 
SystemDiagnosticsSnapshotDTO.GarbageCollectionDTO();
         dto.setName(name);
         dto.setCollectionCount(garbageCollection.getCollectionCount());
         
dto.setCollectionTime(FormatUtils.formatHoursMinutesSeconds(garbageCollection.getCollectionTime(),
 TimeUnit.MILLISECONDS));
+        dto.setCollectionMillis(garbageCollection.getCollectionTime());
         return dto;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index a349f2a..68d0dbe 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -37,7 +37,11 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.ContentNotFoundException;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
 import org.apache.nifi.diagnostics.SystemDiagnostics;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -89,8 +93,12 @@ import 
org.apache.nifi.web.api.dto.provenance.lineage.LineageRequestDTO;
 import 
org.apache.nifi.web.api.dto.provenance.lineage.LineageRequestDTO.LineageRequestType;
 import org.apache.nifi.web.api.dto.search.ComponentSearchResultDTO;
 import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
 import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
+import org.apache.nifi.web.api.dto.status.PortStatusDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.nifi.web.security.ProxiedEntitiesUtils;
 import org.apache.nifi.web.security.user.NiFiUserUtils;
@@ -439,6 +447,8 @@ public class ControllerFacade {
         final ControllerStatusDTO controllerStatus = new ControllerStatusDTO();
         
controllerStatus.setActiveThreadCount(flowController.getActiveThreadCount());
         
controllerStatus.setQueued(FormatUtils.formatCount(controllerQueueSize.getObjectCount())
 + " / " + FormatUtils.formatDataSize(controllerQueueSize.getByteCount()));
+        controllerStatus.setBytesQueued(controllerQueueSize.getByteCount());
+        
controllerStatus.setFlowFilesQueued(controllerQueueSize.getObjectCount());
 
         final BulletinRepository bulletinRepository = getBulletinRepository();
         
controllerStatus.setBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController()));
@@ -477,6 +487,116 @@ public class ControllerFacade {
     }
 
     /**
+     * Gets the status for the specified processor.
+     *
+     * @param groupId group id
+     * @param processorId processor id
+     * @return the status for the specified processor
+     */
+    public ProcessorStatusDTO getProcessorStatus(final String groupId, final 
String processorId) {
+        final ProcessGroupStatus processGroupStatus = 
flowController.getGroupStatus(groupId);
+        if (processGroupStatus == null) {
+            throw new ResourceNotFoundException(String.format("Unable to 
locate group with id '%s'.", groupId));
+        }
+
+        for (final ProcessorStatus processorStatus : 
processGroupStatus.getProcessorStatus()) {
+            if (processorId.equals(processorStatus.getId())) {
+                return dtoFactory.createProcessorStatusDto(processorStatus);
+            }
+        }
+
+        throw new ResourceNotFoundException(String.format("Unable to locate 
processor with id '%s'.", processorId));
+    }
+
+    /**
+     * Gets the status for the specified connection.
+     *
+     * @param groupId group id
+     * @param connectionId connection id
+     * @return the status for the specified connection
+     */
+    public ConnectionStatusDTO getConnectionStatus(final String groupId, final 
String connectionId) {
+        final ProcessGroupStatus processGroupStatus = 
flowController.getGroupStatus(groupId);
+        if (processGroupStatus == null) {
+            throw new ResourceNotFoundException(String.format("Unable to 
locate group with id '%s'.", groupId));
+        }
+
+        for (final ConnectionStatus connectionStatus : 
processGroupStatus.getConnectionStatus()) {
+            if (connectionId.equals(connectionStatus.getId())) {
+                return dtoFactory.createConnectionStatusDto(connectionStatus);
+            }
+        }
+
+        throw new ResourceNotFoundException(String.format("Unable to locate 
connection with id '%s'.", connectionId));
+    }
+
+    /**
+     * Gets the status for the specified input port.
+     *
+     * @param groupId group id
+     * @param portId input port id
+     * @return the status for the specified input port
+     */
+    public PortStatusDTO getInputPortStatus(final String groupId, final String 
portId) {
+        final ProcessGroupStatus processGroupStatus = 
flowController.getGroupStatus(groupId);
+        if (processGroupStatus == null) {
+            throw new ResourceNotFoundException(String.format("Unable to 
locate group with id '%s'.", groupId));
+        }
+
+        for (final PortStatus portStatus : 
processGroupStatus.getInputPortStatus()) {
+            if (portId.equals(portStatus.getId())) {
+                return dtoFactory.createPortStatusDto(portStatus);
+            }
+        }
+
+        throw new ResourceNotFoundException(String.format("Unable to locate 
input port with id '%s'.", portId));
+    }
+
+    /**
+     * Gets the status for the specified output port.
+     *
+     * @param groupId group id
+     * @param portId output port id
+     * @return the status for the specified output port
+     */
+    public PortStatusDTO getOutputPortStatus(final String groupId, final 
String portId) {
+        final ProcessGroupStatus processGroupStatus = 
flowController.getGroupStatus(groupId);
+        if (processGroupStatus == null) {
+            throw new ResourceNotFoundException(String.format("Unable to 
locate group with id '%s'.", groupId));
+        }
+
+        for (final PortStatus portStatus : 
processGroupStatus.getOutputPortStatus()) {
+            if (portId.equals(portStatus.getId())) {
+                return dtoFactory.createPortStatusDto(portStatus);
+            }
+        }
+
+        throw new ResourceNotFoundException(String.format("Unable to locate 
output port with id '%s'.", portId));
+    }
+
+    /**
+     * Gets the status for the specified remote process group.
+     *
+     * @param groupId group id
+     * @param remoteProcessGroupId remote process group id
+     * @return the status for the specified remote process group
+     */
+    public RemoteProcessGroupStatusDTO getRemoteProcessGroupStatus(final 
String groupId, final String remoteProcessGroupId) {
+        final ProcessGroupStatus processGroupStatus = 
flowController.getGroupStatus(groupId);
+        if (processGroupStatus == null) {
+            throw new ResourceNotFoundException(String.format("Unable to 
locate group with id '%s'.", groupId));
+        }
+
+        for (final RemoteProcessGroupStatus remoteProcessGroupStatus : 
processGroupStatus.getRemoteProcessGroupStatus()) {
+            if (remoteProcessGroupId.equals(remoteProcessGroupStatus.getId())) 
{
+                return 
dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroupStatus);
+            }
+        }
+
+        throw new ResourceNotFoundException(String.format("Unable to locate 
remote process group with id '%s'.", remoteProcessGroupId));
+    }
+
+    /**
      * Gets the BulletinRepository.
      *
      * @return the BulletinRepository

Reply via email to