NIFI-1323
- Starting to handling status merging of individual components.
- Nodewise breakdown has been added to Processors but the remaining components 
still need to be updated.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b5cb5da6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b5cb5da6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b5cb5da6

Branch: refs/heads/NIFI-1563
Commit: b5cb5da68e6944842dda0c01caf28511d6a0daca
Parents: 19e8d7d
Author: Matt Gilman <[email protected]>
Authored: Wed Mar 16 10:01:11 2016 -0400
Committer: Matt Gilman <[email protected]>
Committed: Wed Mar 16 10:01:11 2016 -0400

----------------------------------------------------------------------
 .../nifi/web/api/dto/status/StatusMerger.java   |  30 ++-
 .../cluster/manager/impl/WebClusterManager.java | 163 +++++++++----
 .../org/apache/nifi/web/NiFiServiceFacade.java  |  34 +--
 .../nifi/web/StandardNiFiServiceFacade.java     | 244 ++-----------------
 .../nifi/web/api/ApplicationResource.java       |   2 +
 .../apache/nifi/web/api/ClusterResource.java    | 241 ++----------------
 .../apache/nifi/web/api/ConnectionResource.java |  40 ++-
 .../apache/nifi/web/api/InputPortResource.java  |  48 +++-
 .../org/apache/nifi/web/api/NodeResource.java   |  87 +------
 .../apache/nifi/web/api/OutputPortResource.java |  49 +++-
 .../nifi/web/api/ProcessGroupResource.java      |  73 +++++-
 .../apache/nifi/web/api/ProcessorResource.java  |  45 +++-
 .../web/api/RemoteProcessGroupResource.java     |  44 +++-
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  27 +-
 .../cluster-connection-summary-dialog.jsp       |   1 +
 .../cluster-input-port-summary-dialog.jsp       |   1 +
 .../cluster-output-port-summary-dialog.jsp      |   1 +
 .../cluster-processor-summary-dialog.jsp        |   1 +
 ...ster-remote-process-group-summary-dialog.jsp |   1 +
 .../src/main/webapp/js/nf/canvas/nf-canvas.js   |   5 +-
 .../src/main/webapp/js/nf/canvas/nf-graph.js    |  26 +-
 .../webapp/js/nf/summary/nf-cluster-search.js   |   8 +-
 .../webapp/js/nf/summary/nf-summary-table.js    | 153 +++++++-----
 .../src/main/webapp/js/nf/summary/nf-summary.js |   4 +-
 24 files changed, 596 insertions(+), 732 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java
index cc3c367..4e7c13e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java
@@ -75,13 +75,15 @@ public class StatusMerger {
     public static void merge(final ProcessGroupStatusDTO target, final 
ProcessGroupStatusDTO toMerge, final String nodeId, final String nodeAddress, 
final Integer nodeApiPort) {
         merge(target.getAggregateStatus(), toMerge.getAggregateStatus());
 
-        final NodeProcessGroupStatusSnapshotDTO nodeSnapshot = new 
NodeProcessGroupStatusSnapshotDTO();
-        nodeSnapshot.setStatusSnapshot(toMerge.getAggregateStatus());
-        nodeSnapshot.setAddress(nodeAddress);
-        nodeSnapshot.setApiPort(nodeApiPort);
-        nodeSnapshot.setNodeId(nodeId);
-
-        target.getNodeStatuses().add(nodeSnapshot);
+        if (target.getNodeStatuses() != null) {
+            final NodeProcessGroupStatusSnapshotDTO nodeSnapshot = new 
NodeProcessGroupStatusSnapshotDTO();
+            nodeSnapshot.setStatusSnapshot(toMerge.getAggregateStatus());
+            nodeSnapshot.setAddress(nodeAddress);
+            nodeSnapshot.setApiPort(nodeApiPort);
+            nodeSnapshot.setNodeId(nodeId);
+
+            target.getNodeStatuses().add(nodeSnapshot);
+        }
     }
 
     public static void merge(final ProcessGroupStatusSnapshotDTO target, final 
ProcessGroupStatusSnapshotDTO toMerge) {
@@ -252,13 +254,15 @@ public class StatusMerger {
     public static void merge(final ProcessorStatusDTO target, final 
ProcessorStatusDTO toMerge, final String nodeId, final String nodeAddress, 
final Integer nodeApiPort) {
         merge(target.getAggregateStatus(), toMerge.getAggregateStatus());
 
-        final NodeProcessorStatusSnapshotDTO nodeSnapshot = new 
NodeProcessorStatusSnapshotDTO();
-        nodeSnapshot.setStatusSnapshot(toMerge.getAggregateStatus());
-        nodeSnapshot.setAddress(nodeAddress);
-        nodeSnapshot.setApiPort(nodeApiPort);
-        nodeSnapshot.setNodeId(nodeId);
+        if (target.getNodeStatuses() != null) {
+            final NodeProcessorStatusSnapshotDTO nodeSnapshot = new 
NodeProcessorStatusSnapshotDTO();
+            nodeSnapshot.setStatusSnapshot(toMerge.getAggregateStatus());
+            nodeSnapshot.setAddress(nodeAddress);
+            nodeSnapshot.setApiPort(nodeApiPort);
+            nodeSnapshot.setNodeId(nodeId);
 
-        target.getNodeStatuses().add(nodeSnapshot);
+            target.getNodeStatuses().add(nodeSnapshot);
+        }
     }
 
     public static void merge(final ProcessorStatusSnapshotDTO target, final 
ProcessorStatusSnapshotDTO toMerge) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 297d43f..5cbdf7a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -16,57 +16,7 @@
  */
 package org.apache.nifi.cluster.manager.impl;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Queue;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-
-import javax.net.ssl.SSLContext;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.StreamingOutput;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-
+import com.sun.jersey.api.client.ClientResponse;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.annotation.lifecycle.OnAdded;
@@ -222,9 +172,11 @@ import 
org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO;
 import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
+import org.apache.nifi.web.api.dto.status.NodeProcessorStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
 import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.nifi.web.api.dto.status.StatusMerger;
@@ -241,6 +193,7 @@ import org.apache.nifi.web.api.entity.ListingRequestEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
 import org.apache.nifi.web.api.entity.ProcessorsEntity;
 import org.apache.nifi.web.api.entity.ProvenanceEntity;
 import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
@@ -259,7 +212,55 @@ import org.w3c.dom.NodeList;
 import org.xml.sax.SAXException;
 import org.xml.sax.SAXParseException;
 
-import com.sun.jersey.api.client.ClientResponse;
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.StreamingOutput;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
 
 /**
  * Provides a cluster manager implementation. The manager federates incoming 
HTTP client requests to the nodes' external API using the HTTP protocol. The 
manager also communicates with nodes using the
@@ -319,6 +320,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
 
     public static final Pattern PROCESSORS_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors");
     public static final Pattern PROCESSOR_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}");
+    public static final Pattern PROCESSOR_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/status");
     public static final Pattern PROCESSOR_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/state");
     public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = 
Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}");
 
@@ -2410,6 +2412,10 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         return false;
     }
 
+    private static boolean isProcessorStatusEndpoint(final URI uri, final 
String method) {
+        return "GET".equalsIgnoreCase(method) && 
PROCESSOR_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
     private static boolean isProcessorStateEndpoint(final URI uri, final 
String method) {
         return "GET".equalsIgnoreCase(method) && 
PROCESSOR_STATE_URI_PATTERN.matcher(uri.getPath()).matches();
     }
@@ -2562,7 +2568,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                 || isControllerServiceReferenceEndpoint(uri, method) || 
isControllerServiceStateEndpoint(uri, method)
                 || isReportingTasksEndpoint(uri, method) || 
isReportingTaskEndpoint(uri, method) || isReportingTaskStateEndpoint(uri, 
method)
                 || isDropRequestEndpoint(uri, method) || 
isListFlowFilesEndpoint(uri, method)
-                || isGroupStatusEndpoint(uri, method) || 
isControllerStatusEndpoint(uri, method)
+                || isGroupStatusEndpoint(uri, method) || 
isProcessorStatusEndpoint(uri, method) || isControllerStatusEndpoint(uri, 
method)
                 || isProcessorStatusHistoryEndpoint(uri, method) || 
isProcessGroupStatusHistoryEndpoint(uri, method)
                 || isRemoteProcessGroupStatusHistoryEndpoint(uri, method) || 
isConnectionStatusHistoryEndpoint(uri, method)
                 || isBulletinBoardEndpoint(uri, method);
@@ -2645,6 +2651,31 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     }
 
 
+    private void mergeProcessorStatus(final ProcessorStatusDTO statusDto, 
final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, 
ProcessorStatusDTO> resultMap) {
+        final ProcessorStatusDTO mergedProcessorStatus = statusDto;
+        mergedProcessorStatus.setNodeStatuses(new 
ArrayList<NodeProcessorStatusSnapshotDTO>());
+
+        final NodeProcessorStatusSnapshotDTO nodeSnapshot = new 
NodeProcessorStatusSnapshotDTO();
+        nodeSnapshot.setStatusSnapshot(statusDto.getAggregateStatus().clone());
+        nodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+        nodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+        nodeSnapshot.setNodeId(selectedNodeId.getId());
+
+        mergedProcessorStatus.getNodeStatuses().add(nodeSnapshot);
+
+        // merge the other nodes
+        for (final Map.Entry<NodeIdentifier, ProcessorStatusDTO> entry : 
resultMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final ProcessorStatusDTO nodeProcessorStatus = entry.getValue();
+            if (nodeProcessorStatus == statusDto) {
+                continue;
+            }
+
+            StatusMerger.merge(mergedProcessorStatus, nodeProcessorStatus, 
nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+        }
+    }
+
+
     private void mergeControllerStatus(final ControllerStatusDTO statusDto, 
final Map<NodeIdentifier, ControllerStatusDTO> resultMap) {
         ControllerStatusDTO mergedStatus = statusDto;
         for (final Map.Entry<NodeIdentifier, ControllerStatusDTO> entry : 
resultMap.entrySet()) {
@@ -3325,6 +3356,32 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
 
             // create a new client response
             clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && 
isProcessorStatusEndpoint(uri, method)) {
+            final ProcessorStatusEntity responseEntity = 
clientResponse.getClientResponse().getEntity(ProcessorStatusEntity.class);
+            final ProcessorStatusDTO statusRequest = 
responseEntity.getProcessorStatus();
+
+            NodeIdentifier nodeIdentifier = null;
+
+            final Map<NodeIdentifier, ProcessorStatusDTO> resultsMap = new 
HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ProcessorStatusEntity nodeResponseEntity;
+                if (nodeResponse == clientResponse) {
+                    nodeIdentifier = nodeResponse.getNodeId();
+                    nodeResponseEntity = responseEntity;
+                } else {
+                    nodeResponseEntity = 
nodeResponse.getClientResponse().getEntity(ProcessorStatusEntity.class);
+                }
+
+                final ProcessorStatusDTO nodeStatus = 
nodeResponseEntity.getProcessorStatus();
+                resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+            }
+            mergeProcessorStatus(statusRequest, nodeIdentifier, resultsMap);
+
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
         } else if (hasSuccessfulClientResponse && isProcessGroupEndpoint(uri, 
method)) {
             final ProcessGroupEntity responseEntity = 
clientResponse.getClientResponse().getEntity(ProcessGroupEntity.class);
             final ProcessGroupDTO responseDto = 
responseEntity.getProcessGroup();

http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 66455a8..61c6158 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -16,10 +16,6 @@
  */
 package org.apache.nifi.web;
 
-import java.util.Collection;
-import java.util.Date;
-import java.util.Set;
-
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
 import org.apache.nifi.controller.service.ControllerServiceState;
@@ -65,17 +61,19 @@ import 
org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO;
 import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
 import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
-import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO;
 import org.apache.nifi.web.api.dto.status.ClusterStatusDTO;
 import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
 import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
-import org.apache.nifi.web.api.dto.status.NodeStatusDTO;
 import org.apache.nifi.web.api.dto.status.PortStatusDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
 import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 
+import java.util.Collection;
+import java.util.Date;
+import java.util.Set;
+
 /**
  * Defines the NiFiServiceFacade interface.
  */
@@ -1552,14 +1550,6 @@ public interface NiFiServiceFacade {
     void deleteNode(String nodeId);
 
     /**
-     * Returns the status the specified node id.
-     *
-     * @param nodeId The id of the desired node
-     * @return The node status
-     */
-    NodeStatusDTO getNodeStatus(String nodeId);
-
-    /**
      * Returns the system diagnostics for the specified node id.
      *
      * @param nodeId The id of the desired node
@@ -1574,22 +1564,6 @@ public interface NiFiServiceFacade {
      */
     ClusterStatusDTO getClusterStatus();
 
-    /**
-     * Returns an input port's status for each node connected to the cluster.
-     *
-     * @param inputPortId a port identifier
-     * @return The cluster port status transfer object.
-     */
-    ClusterPortStatusDTO getClusterInputPortStatus(String inputPortId);
-
-    /**
-     * Returns an output port's status for each node connected to the cluster.
-     *
-     * @param outputPortId a port identifier
-     * @return The cluster port status transfer object.
-     */
-    ClusterPortStatusDTO getClusterOutputPortStatus(String outputPortId);
-
     // ----------------------------------------
     // BulletinBoard methods
     // ----------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 037a54d..d9135da 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -16,25 +16,6 @@
  */
 package org.apache.nifi.web;
 
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import javax.ws.rs.WebApplicationException;
-
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
@@ -74,9 +55,7 @@ import 
org.apache.nifi.controller.repository.claim.ContentDirection;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceReference;
 import org.apache.nifi.controller.service.ControllerServiceState;
-import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
 import org.apache.nifi.diagnostics.SystemDiagnostics;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.ProcessGroupCounts;
@@ -141,11 +120,9 @@ import 
org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO;
 import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
 import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
-import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO;
 import org.apache.nifi.web.api.dto.status.ClusterStatusDTO;
 import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
 import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
-import org.apache.nifi.web.api.dto.status.NodePortStatusDTO;
 import org.apache.nifi.web.api.dto.status.NodeStatusDTO;
 import org.apache.nifi.web.api.dto.status.PortStatusDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
@@ -171,6 +148,24 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.security.access.AccessDeniedException;
 
+import javax.ws.rs.WebApplicationException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Implementation of NiFiServiceFacade that performs revision checking.
  */
@@ -2887,209 +2882,6 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
         clusterManager.deleteNode(nodeId, userDn);
     }
 
-    private PortStatus findNodeInputPortStatus(final ProcessGroupStatus 
groupStatus, final String inputPortId) {
-        PortStatus portStatus = null;
-
-        for (final PortStatus status : groupStatus.getInputPortStatus()) {
-            if (inputPortId.equals(status.getId())) {
-                portStatus = status;
-                break;
-            }
-        }
-
-        if (portStatus == null) {
-            for (final ProcessGroupStatus status : 
groupStatus.getProcessGroupStatus()) {
-                portStatus = findNodeInputPortStatus(status, inputPortId);
-
-                if (portStatus != null) {
-                    break;
-                }
-            }
-        }
-
-        return portStatus;
-    }
-
-    @Override
-    public ClusterPortStatusDTO getClusterInputPortStatus(String inputPortId) {
-        final ClusterPortStatusDTO clusterInputPortStatusDto = new 
ClusterPortStatusDTO();
-        clusterInputPortStatusDto.setNodePortStatus(new 
ArrayList<NodePortStatusDTO>());
-
-        // set the current time
-        clusterInputPortStatusDto.setStatsLastRefreshed(new Date());
-
-        final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED);
-        boolean firstNode = true;
-        for (final Node node : nodes) {
-
-            final HeartbeatPayload nodeHeartbeatPayload = 
node.getHeartbeatPayload();
-            if (nodeHeartbeatPayload == null) {
-                continue;
-            }
-
-            final ProcessGroupStatus nodeStats = 
nodeHeartbeatPayload.getProcessGroupStatus();
-            if (nodeStats == null || nodeStats.getProcessorStatus() == null) {
-                continue;
-            }
-
-            // find the input status for this node
-            final PortStatus inputPortStatus = 
findNodeInputPortStatus(nodeStats, inputPortId);
-
-            // sanity check that we have status for this input port
-            if (inputPortStatus == null) {
-                throw new ResourceNotFoundException(String.format("Unable to 
find status for input port id '%s'.", inputPortId));
-            }
-
-            if (firstNode) {
-                clusterInputPortStatusDto.setPortId(inputPortId);
-                
clusterInputPortStatusDto.setPortName(inputPortStatus.getName());
-                firstNode = false;
-            }
-
-            // create node port status dto
-            final NodePortStatusDTO nodeInputPortStatusDTO = new 
NodePortStatusDTO();
-            
clusterInputPortStatusDto.getNodePortStatus().add(nodeInputPortStatusDTO);
-
-            // populate node input port status dto
-            final String nodeId = node.getNodeId().getId();
-            nodeInputPortStatusDTO.setNode(dtoFactory.createNodeDTO(node, 
clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
-            
nodeInputPortStatusDTO.setPortStatus(dtoFactory.createPortStatusDto(inputPortStatus));
-        }
-
-        return clusterInputPortStatusDto;
-    }
-
-    private PortStatus findNodeOutputPortStatus(final ProcessGroupStatus 
groupStatus, final String outputPortId) {
-        PortStatus portStatus = null;
-
-        for (final PortStatus status : groupStatus.getOutputPortStatus()) {
-            if (outputPortId.equals(status.getId())) {
-                portStatus = status;
-                break;
-            }
-        }
-
-        if (portStatus == null) {
-            for (final ProcessGroupStatus status : 
groupStatus.getProcessGroupStatus()) {
-                portStatus = findNodeOutputPortStatus(status, outputPortId);
-
-                if (portStatus != null) {
-                    break;
-                }
-            }
-        }
-
-        return portStatus;
-    }
-
-    @Override
-    public ClusterPortStatusDTO getClusterOutputPortStatus(String 
outputPortId) {
-        final ClusterPortStatusDTO clusterOutputPortStatusDto = new 
ClusterPortStatusDTO();
-        clusterOutputPortStatusDto.setNodePortStatus(new 
ArrayList<NodePortStatusDTO>());
-
-        // set the current time
-        clusterOutputPortStatusDto.setStatsLastRefreshed(new Date());
-
-        final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED);
-        boolean firstNode = true;
-        for (final Node node : nodes) {
-
-            final HeartbeatPayload nodeHeartbeatPayload = 
node.getHeartbeatPayload();
-            if (nodeHeartbeatPayload == null) {
-                continue;
-            }
-
-            final ProcessGroupStatus nodeStats = 
nodeHeartbeatPayload.getProcessGroupStatus();
-            if (nodeStats == null || nodeStats.getProcessorStatus() == null) {
-                continue;
-            }
-
-            // find the output status for this node
-            final PortStatus outputPortStatus = 
findNodeOutputPortStatus(nodeStats, outputPortId);
-
-            // sanity check that we have status for this output port
-            if (outputPortStatus == null) {
-                throw new ResourceNotFoundException(String.format("Unable to 
find status for output port id '%s'.", outputPortId));
-            }
-
-            if (firstNode) {
-                clusterOutputPortStatusDto.setPortId(outputPortId);
-                
clusterOutputPortStatusDto.setPortName(outputPortStatus.getName());
-                firstNode = false;
-            }
-
-            // create node port status dto
-            final NodePortStatusDTO nodeOutputPortStatusDTO = new 
NodePortStatusDTO();
-            
clusterOutputPortStatusDto.getNodePortStatus().add(nodeOutputPortStatusDTO);
-
-            // populate node output port status dto
-            final String nodeId = node.getNodeId().getId();
-            nodeOutputPortStatusDTO.setNode(dtoFactory.createNodeDTO(node, 
clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
-            
nodeOutputPortStatusDTO.setPortStatus(dtoFactory.createPortStatusDto(outputPortStatus));
-        }
-
-        return clusterOutputPortStatusDto;
-    }
-
-    private RemoteProcessGroupStatus findNodeRemoteProcessGroupStatus(final 
ProcessGroupStatus groupStatus, final String remoteProcessGroupId) {
-        RemoteProcessGroupStatus remoteProcessGroupStatus = null;
-
-        for (final RemoteProcessGroupStatus status : 
groupStatus.getRemoteProcessGroupStatus()) {
-            if (remoteProcessGroupId.equals(status.getId())) {
-                remoteProcessGroupStatus = status;
-                break;
-            }
-        }
-
-        if (remoteProcessGroupStatus == null) {
-            for (final ProcessGroupStatus status : 
groupStatus.getProcessGroupStatus()) {
-                remoteProcessGroupStatus = 
findNodeRemoteProcessGroupStatus(status, remoteProcessGroupId);
-
-                if (remoteProcessGroupStatus != null) {
-                    break;
-                }
-            }
-        }
-
-        return remoteProcessGroupStatus;
-    }
-
-
-
-    @Override
-    public NodeStatusDTO getNodeStatus(String nodeId) {
-        // find the node in question
-        final Node node = clusterManager.getNode(nodeId);
-
-        // verify node state
-        if (node == null) {
-            throw new UnknownNodeException("Node does not exist.");
-        } else if (Node.Status.CONNECTED != node.getStatus()) {
-            throw new IllegalClusterStateException(
-                    String.format("Node '%s:%s' is not connected to the 
cluster.",
-                            node.getNodeId().getApiAddress(), 
node.getNodeId().getApiPort()));
-        }
-
-        // get the node's last heartbeat
-        final NodeStatusDTO nodeStatus = new NodeStatusDTO();
-        final HeartbeatPayload nodeHeartbeatPayload = 
node.getHeartbeatPayload();
-        if (nodeHeartbeatPayload == null) {
-            return nodeStatus;
-        }
-
-        // get the node status
-        final ProcessGroupStatus nodeProcessGroupStatus = 
nodeHeartbeatPayload.getProcessGroupStatus();
-        if (nodeProcessGroupStatus == null) {
-            return nodeStatus;
-        }
-
-        final ProcessGroupStatusDTO nodeProcessGroupStatusDto = 
dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), 
nodeProcessGroupStatus);
-        nodeStatus.setControllerStatus(nodeProcessGroupStatusDto);
-        nodeStatus.setNode(dtoFactory.createNodeDTO(node, 
clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
-
-        return nodeStatus;
-    }
-
     @Override
     public NodeSystemDiagnosticsDTO getNodeSystemDiagnostics(String nodeId) {
         // find the node in question

http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 256d41f..a3774ce 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -77,6 +77,8 @@ public abstract class ApplicationResource {
     private static final int CLUSTER_CONTEXT_HEADER_VALUE_MAX_BYTES = (int) 
(0.75 * HEADER_BUFFER_SIZE);
     private static final Logger logger = 
LoggerFactory.getLogger(ApplicationResource.class);
 
+    public static final String NODEWISE = "false";
+
     @Context
     private HttpServletRequest httpServletRequest;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
index a989620..bd6a39a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
@@ -16,27 +16,13 @@
  */
 package org.apache.nifi.web.api;
 
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.FormParam;
-import javax.ws.rs.GET;
-import javax.ws.rs.HEAD;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
+import com.sun.jersey.api.core.ResourceContext;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
 import org.apache.nifi.cluster.node.Node;
@@ -51,25 +37,31 @@ import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.RevisionDTO;
 import org.apache.nifi.web.api.dto.search.NodeSearchResultDTO;
-import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO;
 import org.apache.nifi.web.api.dto.status.ClusterStatusDTO;
 import org.apache.nifi.web.api.entity.ClusterEntity;
-import org.apache.nifi.web.api.entity.ClusterPortStatusEntity;
 import org.apache.nifi.web.api.entity.ClusterSearchResultsEntity;
 import org.apache.nifi.web.api.entity.ClusterStatusEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
-import org.apache.nifi.web.api.entity.StatusHistoryEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.LongParameter;
 import org.springframework.security.access.prepost.PreAuthorize;
 
-import com.sun.jersey.api.core.ResourceContext;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.FormParam;
+import javax.ws.rs.GET;
+import javax.ws.rs.HEAD;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * RESTful endpoint for managing a cluster.
@@ -513,193 +505,6 @@ public class ClusterResource extends ApplicationResource {
         throw new IllegalClusterResourceRequestException("Only a node can 
process the request.");
     }
 
-
-    /**
-     * Gets the processor status history for every node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
-     * @param id The id of the processor
-     * @return A clusterProcessorStatusHistoryEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
-    @Path("/processors/{id}/status/history")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets processor status history across the cluster",
- response = StatusHistoryEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "DFM", type = "ROLE_DFM"),
-                @Authorization(value = "Admin", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
-                @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized 
to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but 
NiFi was not in the appropriate state to process it. Retrying the same request 
later may be successful.")
-            }
-    )
-    public Response getProcessorStatusHistory(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The processor id",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-            final URI uri;
-            try {
-                final URI originalUri = getAbsolutePath();
-                final String newPath = "/nifi-api/processors/" + id + 
"/status/history";
-                uri = new URI(originalUri.getScheme(), 
originalUri.getAuthority(), newPath, originalUri.getQuery(), 
originalUri.getFragment());
-            } catch (final URISyntaxException use) {
-                throw new RuntimeException(use);
-            }
-
-            return clusterManager.applyRequest(HttpMethod.GET, uri, 
getRequestParameters(true), getHeaders()).getResponse();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster 
manager can process the request.");
-    }
-
-
-
-
-    /**
-     * Gets the input port status for every node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
-     * @param id The id of the input port
-     * @return A clusterPortStatusEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
-    @Path("/input-ports/{id}/status")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets input port status across the cluster",
-            response = ClusterPortStatusEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "DFM", type = "ROLE_DFM"),
-                @Authorization(value = "Admin", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
-                @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized 
to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but 
NiFi was not in the appropriate state to process it. Retrying the same request 
later may be successful.")
-            }
-    )
-    public Response getInputPortStatus(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The input port id.",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-
-            final ClusterPortStatusDTO dto = 
serviceFacade.getClusterInputPortStatus(id);
-
-            // create the revision
-            RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create entity
-            final ClusterPortStatusEntity entity = new 
ClusterPortStatusEntity();
-            entity.setClusterPortStatus(dto);
-            entity.setRevision(revision);
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster 
manager can process the request.");
-    }
-
-    /**
-     * Gets the output port status for every node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
-     * @param id The id of the output port
-     * @return A clusterPortStatusEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
-    @Path("/output-ports/{id}/status")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets output port status across the cluster",
-            response = ClusterPortStatusEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "DFM", type = "ROLE_DFM"),
-                @Authorization(value = "Admin", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
-                @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized 
to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but 
NiFi was not in the appropriate state to process it. Retrying the same request 
later may be successful.")
-            }
-    )
-    public Response getOutputPortStatus(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The output port id.",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-
-            final ClusterPortStatusDTO dto = 
serviceFacade.getClusterOutputPortStatus(id);
-
-            // create the revision
-            RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create entity
-            final ClusterPortStatusEntity entity = new 
ClusterPortStatusEntity();
-            entity.setClusterPortStatus(dto);
-            entity.setRevision(revision);
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster 
manager can process the request.");
-    }
-
-
     // setters
     public void setServiceFacade(NiFiServiceFacade serviceFacade) {
         this.serviceFacade = serviceFacade;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
index f1d487b..dea0407 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
@@ -310,26 +310,60 @@ public class ConnectionResource extends 
ApplicationResource {
             @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
         }
     )
-    public Response getProcessorStatus(
+    public Response getConnectionStatus(
         @ApiParam(
             value = "If the client id is not specified, new one will be 
generated. This value (whether specified or generated) is included in the 
response.",
             required = false
         )
         @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
         @ApiParam(
+            value = "Whether or not to include the breakdown per node. 
Optional, defaults to false",
+            required = false
+        )
+        @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+        @ApiParam(
+            value = "The id of the node where to get the status.",
+            required = false
+        )
+        @QueryParam("clusterNodeId") String clusterNodeId,
+        @ApiParam(
             value = "The connection id.",
             required = true
         )
         @PathParam("id") String id) {
 
-        // replicate if cluster manager
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be 
directed at a specific node.");
+        }
+
         if (properties.isClusterManager()) {
-            return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node 
does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders(), 
targetNodes).getResponse();
+            }
         }
 
         // get the specified connection status
         final ConnectionStatusDTO connectionStatus = 
serviceFacade.getConnectionStatus(groupId, id);
 
+        // prune the response as necessary
+        if (!nodewise) {
+            connectionStatus.setNodeStatuses(null);
+        }
+
         // create the revision
         final RevisionDTO revision = new RevisionDTO();
         revision.setClientId(clientId.getClientId());

http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
index 8bf07c6..ba5b761 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
@@ -23,7 +23,10 @@ import com.wordnik.swagger.annotations.ApiResponse;
 import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ConfigurationSnapshot;
 import org.apache.nifi.web.NiFiServiceFacade;
@@ -64,6 +67,7 @@ import javax.ws.rs.core.Response;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -402,25 +406,59 @@ public class InputPortResource extends 
ApplicationResource {
             @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
         }
     )
-    public Response getProcessorStatus(
+    public Response getInputPortStatus(
         @ApiParam(
             value = "If the client id is not specified, new one will be 
generated. This value (whether specified or generated) is included in the 
response.",
             required = false
         )
         @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
         @ApiParam(
+            value = "Whether or not to include the breakdown per node. 
Optional, defaults to false",
+            required = false
+        )
+        @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+        @ApiParam(
+            value = "The id of the node where to get the status.",
+            required = false
+        )
+        @QueryParam("clusterNodeId") String clusterNodeId,
+        @ApiParam(
             value = "The input port id.",
             required = true
         )
         @PathParam("id") String id) {
 
-        // replicate if cluster manager
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be 
directed at a specific node.");
+        }
+
         if (properties.isClusterManager()) {
-            return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node 
does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders(), 
targetNodes).getResponse();
+            }
         }
 
         // get the specified input port status
-        final PortStatusDTO processorStatus = 
serviceFacade.getInputPortStatus(groupId, id);
+        final PortStatusDTO portStatus = 
serviceFacade.getInputPortStatus(groupId, id);
+
+        // prune the response as necessary
+        if (!nodewise) {
+            portStatus.setNodeStatuses(null);
+        }
 
         // create the revision
         final RevisionDTO revision = new RevisionDTO();
@@ -429,7 +467,7 @@ public class InputPortResource extends ApplicationResource {
         // generate the response entity
         final PortStatusEntity entity = new PortStatusEntity();
         entity.setRevision(revision);
-        entity.setPortStatus(processorStatus);
+        entity.setPortStatus(portStatus);
 
         // generate the response
         return clusterContext(generateOkResponse(entity)).build();

http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java
index c88cc68..e4258f6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java
@@ -22,6 +22,18 @@ import com.wordnik.swagger.annotations.ApiParam;
 import com.wordnik.swagger.annotations.ApiResponse;
 import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.IllegalClusterResourceRequestException;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.api.dto.NodeDTO;
+import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO;
+import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.entity.NodeEntity;
+import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity;
+import org.apache.nifi.web.api.request.ClientIdParameter;
+import org.springframework.security.access.prepost.PreAuthorize;
+
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
@@ -34,19 +46,6 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.api.dto.NodeDTO;
-import org.apache.nifi.web.api.entity.NodeEntity;
-import org.apache.nifi.web.api.request.ClientIdParameter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.web.IllegalClusterResourceRequestException;
-import org.apache.nifi.web.NiFiServiceFacade;
-import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO;
-import org.apache.nifi.web.api.dto.RevisionDTO;
-import org.apache.nifi.web.api.dto.status.NodeStatusDTO;
-import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
-import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity;
-import org.springframework.security.access.prepost.PreAuthorize;
 
 /**
  * RESTful endpoint for managing a cluster connection.
@@ -122,68 +121,6 @@ public class NodeResource extends ApplicationResource {
     }
 
     /**
-     * Gets the status for the specified node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
-     * @param id The id of the node.
-     * @return A processGroupStatusEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("/{id}/status")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets process group status for a node in the cluster",
-            response = ProcessGroupStatusEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
-                @Authorization(value = "Administrator", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
-                @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized 
to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but 
NiFi was not in the appropriate state to process it. Retrying the same request 
later may be successful.")
-            }
-    )
-    public Response getNodeStatus(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The node id.",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-            // get the node statistics
-            final NodeStatusDTO nodeStatus = serviceFacade.getNodeStatus(id);
-
-            // create the revision
-            final RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create the node statics entity
-            final ProcessGroupStatusEntity entity = new 
ProcessGroupStatusEntity();
-            entity.setRevision(revision);
-            entity.setProcessGroupStatus(nodeStatus.getControllerStatus());
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster 
manager can process the request.");
-    }
-
-    /**
      * Gets the system diagnositics for the specified node.
      *
      * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.

http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
index f72d3e7..450b005 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
@@ -25,6 +25,7 @@ import com.wordnik.swagger.annotations.Authorization;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -46,7 +47,11 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
+
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ConfigurationSnapshot;
 import org.apache.nifi.web.NiFiServiceFacade;
@@ -401,25 +406,59 @@ public class OutputPortResource extends 
ApplicationResource {
             @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
         }
     )
-    public Response getProcessorStatus(
+    public Response getOutputPortStatus(
         @ApiParam(
             value = "If the client id is not specified, new one will be 
generated. This value (whether specified or generated) is included in the 
response.",
             required = false
         )
         @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
         @ApiParam(
+            value = "Whether or not to include the breakdown per node. 
Optional, defaults to false",
+            required = false
+        )
+        @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+        @ApiParam(
+            value = "The id of the node where to get the status.",
+            required = false
+        )
+        @QueryParam("clusterNodeId") String clusterNodeId,
+        @ApiParam(
             value = "The output port id.",
             required = true
         )
         @PathParam("id") String id) {
 
-        // replicate if cluster manager
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be 
directed at a specific node.");
+        }
+
         if (properties.isClusterManager()) {
-            return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node 
does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders(), 
targetNodes).getResponse();
+            }
         }
 
         // get the specified output port status
-        final PortStatusDTO processorStatus = 
serviceFacade.getOutputPortStatus(groupId, id);
+        final PortStatusDTO portStatus = 
serviceFacade.getOutputPortStatus(groupId, id);
+
+        // prune the response as necessary
+        if (!nodewise) {
+            portStatus.setNodeStatuses(null);
+        }
 
         // create the revision
         final RevisionDTO revision = new RevisionDTO();
@@ -428,7 +467,7 @@ public class OutputPortResource extends ApplicationResource 
{
         // generate the response entity
         final PortStatusEntity entity = new PortStatusEntity();
         entity.setRevision(revision);
-        entity.setPortStatus(processorStatus);
+        entity.setPortStatus(portStatus);
 
         // generate the response
         return clusterContext(generateOkResponse(entity)).build();

http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index cd4bd73..8b6c4b4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -19,6 +19,7 @@ package org.apache.nifi.web.api;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -42,7 +43,10 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ConfigurationSnapshot;
 import org.apache.nifi.web.NiFiServiceFacade;
@@ -1254,7 +1258,7 @@ public class ProcessGroupResource extends 
ApplicationResource {
      * Retrieves the status report for this NiFi.
      *
      * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
-     * @param recursive Optional recursive flag that defaults to false. If set 
to true, all descendent groups and their content will be included if the 
verbose flag is also set to true.
+     * @param recursive Optional recursive flag that defaults to false. If set 
to true, all descendant groups and the status of their content will be included.
      * @return A processGroupStatusEntity.
      */
     @GET
@@ -1284,21 +1288,66 @@ public class ProcessGroupResource extends 
ApplicationResource {
             }
     )
     public Response getProcessGroupStatus(
+            @ApiParam(
+                value = "If the client id is not specified, new one will be 
generated. This value (whether specified or generated) is included in the 
response.",
+                required = false
+            )
             @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
-            @QueryParam("recursive") @DefaultValue(RECURSIVE) Boolean 
recursive) {
+            @ApiParam(
+                value = "Whether all descendant groups and the status of their 
content will be included. Optional, defaults to false",
+                required = false
+            )
+            @QueryParam("recursive") @DefaultValue(RECURSIVE) Boolean 
recursive,
+            @ApiParam(
+                value = "Whether or not to include the breakdown per node. 
Optional, defaults to false",
+                required = false
+            )
+            @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+            @ApiParam(
+                value = "The id of the node where to get the status.",
+                required = false
+            )
+            @QueryParam("clusterNodeId") String clusterNodeId) {
+
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be 
directed at a specific node.");
+        }
 
         if (properties.isClusterManager()) {
-            return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node 
does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders(), 
targetNodes).getResponse();
+            }
         }
 
         // get the status
         final ProcessGroupStatusDTO statusReport = 
serviceFacade.getProcessGroupStatus(groupId);
 
         // prune the response as necessary
+        if (!nodewise) {
+            statusReport.setNodeStatuses(null);
+        }
+
+        // prune the response as necessary
         if (!recursive) {
             prune(statusReport.getAggregateStatus());
-            for (final NodeProcessGroupStatusSnapshotDTO nodeSnapshot : 
statusReport.getNodeStatuses()) {
-                prune(nodeSnapshot.getStatusSnapshot());
+            if (statusReport.getNodeStatuses() != null) {
+                for (final NodeProcessGroupStatusSnapshotDTO nodeSnapshot : 
statusReport.getNodeStatuses()) {
+                    prune(nodeSnapshot.getStatusSnapshot());
+                }
             }
         }
 
@@ -1316,12 +1365,14 @@ public class ProcessGroupResource extends 
ApplicationResource {
     }
 
     private void prune(final ProcessGroupStatusSnapshotDTO snapshot) {
-        snapshot.setConnectionStatusSnapshots(null);
-        snapshot.setProcessGroupStatusSnapshots(null);
-        snapshot.setInputPortStatusSnapshots(null);
-        snapshot.setOutputPortStatusSnapshots(null);
-        snapshot.setProcessorStatusSnapshots(null);
-        snapshot.setRemoteProcessGroupStatusSnapshots(null);
+        for (final ProcessGroupStatusSnapshotDTO childProcessGroupStatus : 
snapshot.getProcessGroupStatusSnapshots()) {
+            childProcessGroupStatus.setConnectionStatusSnapshots(null);
+            childProcessGroupStatus.setProcessGroupStatusSnapshots(null);
+            childProcessGroupStatus.setInputPortStatusSnapshots(null);
+            childProcessGroupStatus.setOutputPortStatusSnapshots(null);
+            childProcessGroupStatus.setProcessorStatusSnapshots(null);
+            childProcessGroupStatus.setRemoteProcessGroupStatusSnapshots(null);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
index 4ccfb61..683d01b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
@@ -23,9 +23,12 @@ import com.wordnik.swagger.annotations.ApiResponse;
 import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
 import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.ui.extension.UiExtension;
 import org.apache.nifi.ui.extension.UiExtensionMapping;
@@ -77,6 +80,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -453,14 +457,51 @@ public class ProcessorResource extends 
ApplicationResource {
         )
         @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
         @ApiParam(
+            value = "Whether or not to include the breakdown per node. 
Optional, defaults to false",
+            required = false
+        )
+        @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+        @ApiParam(
+            value = "The id of the node where to get the status.",
+            required = false
+        )
+        @QueryParam("clusterNodeId") String clusterNodeId,
+        @ApiParam(
             value = "The processor id.",
             required = true
         )
         @PathParam("id") String id) {
 
-        // replicate if cluster manager
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be 
directed at a specific node.");
+        }
+
         if (properties.isClusterManager()) {
-            return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                final NodeResponse nodeResponse = 
clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), 
getRequestParameters(true), getHeaders());
+                final ProcessorStatusEntity entity = (ProcessorStatusEntity) 
nodeResponse.getUpdatedEntity();
+
+                // ensure there is an updated entity (result of merging) and 
prune the response as necessary
+                if (entity != null && !nodewise) {
+                    entity.getProcessorStatus().setNodeStatuses(null);
+                }
+
+                return nodeResponse.getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node 
does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders(), 
targetNodes).getResponse();
+            }
         }
 
         // get the specified processor status

http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
index f03af44..19b29c4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
@@ -23,7 +23,10 @@ import com.wordnik.swagger.annotations.ApiResponse;
 import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ConfigurationSnapshot;
 import org.apache.nifi.web.NiFiServiceFacade;
@@ -67,6 +70,7 @@ import javax.ws.rs.core.Response;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -283,26 +287,60 @@ public class RemoteProcessGroupResource extends 
ApplicationResource {
             @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
         }
     )
-    public Response getProcessorStatus(
+    public Response getRemoteProcessGroupStatus(
         @ApiParam(
             value = "If the client id is not specified, new one will be 
generated. This value (whether specified or generated) is included in the 
response.",
             required = false
         )
         @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
         @ApiParam(
+            value = "Whether or not to include the breakdown per node. 
Optional, defaults to false",
+            required = false
+        )
+        @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+        @ApiParam(
+            value = "The id of the node where to get the status.",
+            required = false
+        )
+        @QueryParam("clusterNodeId") String clusterNodeId,
+        @ApiParam(
             value = "The remote process group id.",
             required = true
         )
         @PathParam("id") String id) {
 
-        // replicate if cluster manager
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be 
directed at a specific node.");
+        }
+
         if (properties.isClusterManager()) {
-            return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node 
does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders(), 
targetNodes).getResponse();
+            }
         }
 
         // get the specified remote process group status
         final RemoteProcessGroupStatusDTO remoteProcessGroupStatus = 
serviceFacade.getRemoteProcessGroupStatus(groupId, id);
 
+        // prune the response as necessary
+        if (!nodewise) {
+            remoteProcessGroupStatus.setNodeStatuses(null);
+        }
+
         // create the revision
         final RevisionDTO revision = new RevisionDTO();
         revision.setClientId(clientId.getClientId());

Reply via email to