NIFI-1323 - Starting to handling status merging of individual components. - Nodewise breakdown has been added to Processors but the remaining components still need to be updated.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b5cb5da6 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b5cb5da6 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b5cb5da6 Branch: refs/heads/NIFI-1563 Commit: b5cb5da68e6944842dda0c01caf28511d6a0daca Parents: 19e8d7d Author: Matt Gilman <[email protected]> Authored: Wed Mar 16 10:01:11 2016 -0400 Committer: Matt Gilman <[email protected]> Committed: Wed Mar 16 10:01:11 2016 -0400 ---------------------------------------------------------------------- .../nifi/web/api/dto/status/StatusMerger.java | 30 ++- .../cluster/manager/impl/WebClusterManager.java | 163 +++++++++---- .../org/apache/nifi/web/NiFiServiceFacade.java | 34 +-- .../nifi/web/StandardNiFiServiceFacade.java | 244 ++----------------- .../nifi/web/api/ApplicationResource.java | 2 + .../apache/nifi/web/api/ClusterResource.java | 241 ++---------------- .../apache/nifi/web/api/ConnectionResource.java | 40 ++- .../apache/nifi/web/api/InputPortResource.java | 48 +++- .../org/apache/nifi/web/api/NodeResource.java | 87 +------ .../apache/nifi/web/api/OutputPortResource.java | 49 +++- .../nifi/web/api/ProcessGroupResource.java | 73 +++++- .../apache/nifi/web/api/ProcessorResource.java | 45 +++- .../web/api/RemoteProcessGroupResource.java | 44 +++- .../org/apache/nifi/web/api/dto/DtoFactory.java | 27 +- .../cluster-connection-summary-dialog.jsp | 1 + .../cluster-input-port-summary-dialog.jsp | 1 + .../cluster-output-port-summary-dialog.jsp | 1 + .../cluster-processor-summary-dialog.jsp | 1 + ...ster-remote-process-group-summary-dialog.jsp | 1 + .../src/main/webapp/js/nf/canvas/nf-canvas.js | 5 +- .../src/main/webapp/js/nf/canvas/nf-graph.js | 26 +- .../webapp/js/nf/summary/nf-cluster-search.js | 8 +- .../webapp/js/nf/summary/nf-summary-table.js | 153 +++++++----- .../src/main/webapp/js/nf/summary/nf-summary.js | 4 +- 24 files changed, 596 insertions(+), 732 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java index cc3c367..4e7c13e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java @@ -75,13 +75,15 @@ public class StatusMerger { public static void merge(final ProcessGroupStatusDTO target, final ProcessGroupStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { merge(target.getAggregateStatus(), toMerge.getAggregateStatus()); - final NodeProcessGroupStatusSnapshotDTO nodeSnapshot = new NodeProcessGroupStatusSnapshotDTO(); - nodeSnapshot.setStatusSnapshot(toMerge.getAggregateStatus()); - nodeSnapshot.setAddress(nodeAddress); - nodeSnapshot.setApiPort(nodeApiPort); - nodeSnapshot.setNodeId(nodeId); - - target.getNodeStatuses().add(nodeSnapshot); + if (target.getNodeStatuses() != null) { + final NodeProcessGroupStatusSnapshotDTO nodeSnapshot = new NodeProcessGroupStatusSnapshotDTO(); + nodeSnapshot.setStatusSnapshot(toMerge.getAggregateStatus()); + nodeSnapshot.setAddress(nodeAddress); + nodeSnapshot.setApiPort(nodeApiPort); + nodeSnapshot.setNodeId(nodeId); + + target.getNodeStatuses().add(nodeSnapshot); + } } public static void merge(final ProcessGroupStatusSnapshotDTO target, final ProcessGroupStatusSnapshotDTO toMerge) { @@ -252,13 +254,15 @@ public class StatusMerger { public static void merge(final ProcessorStatusDTO target, final ProcessorStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { merge(target.getAggregateStatus(), toMerge.getAggregateStatus()); - final NodeProcessorStatusSnapshotDTO nodeSnapshot = new NodeProcessorStatusSnapshotDTO(); - nodeSnapshot.setStatusSnapshot(toMerge.getAggregateStatus()); - nodeSnapshot.setAddress(nodeAddress); - nodeSnapshot.setApiPort(nodeApiPort); - nodeSnapshot.setNodeId(nodeId); + if (target.getNodeStatuses() != null) { + final NodeProcessorStatusSnapshotDTO nodeSnapshot = new NodeProcessorStatusSnapshotDTO(); + nodeSnapshot.setStatusSnapshot(toMerge.getAggregateStatus()); + nodeSnapshot.setAddress(nodeAddress); + nodeSnapshot.setApiPort(nodeApiPort); + nodeSnapshot.setNodeId(nodeId); - target.getNodeStatuses().add(nodeSnapshot); + target.getNodeStatuses().add(nodeSnapshot); + } } public static void merge(final ProcessorStatusSnapshotDTO target, final ProcessorStatusSnapshotDTO toMerge) { http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 297d43f..5cbdf7a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -16,57 +16,7 @@ */ package org.apache.nifi.cluster.manager.impl; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.io.Serializable; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.NavigableSet; -import java.util.Queue; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.UUID; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.regex.Pattern; - -import javax.net.ssl.SSLContext; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.StreamingOutput; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; -import javax.xml.transform.OutputKeys; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerException; -import javax.xml.transform.TransformerFactory; -import javax.xml.transform.dom.DOMSource; -import javax.xml.transform.stream.StreamResult; - +import com.sun.jersey.api.client.ClientResponse; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.annotation.lifecycle.OnAdded; @@ -222,9 +172,11 @@ import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; +import org.apache.nifi.web.api.dto.status.NodeProcessorStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.dto.status.StatusMerger; @@ -241,6 +193,7 @@ import org.apache.nifi.web.api.entity.ListingRequestEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.ProcessorStatusEntity; import org.apache.nifi.web.api.entity.ProcessorsEntity; import org.apache.nifi.web.api.entity.ProvenanceEntity; import org.apache.nifi.web.api.entity.ProvenanceEventEntity; @@ -259,7 +212,55 @@ import org.w3c.dom.NodeList; import org.xml.sax.SAXException; import org.xml.sax.SAXParseException; -import com.sun.jersey.api.client.ClientResponse; +import javax.net.ssl.SSLContext; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.StreamingOutput; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Queue; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Pattern; /** * Provides a cluster manager implementation. The manager federates incoming HTTP client requests to the nodes' external API using the HTTP protocol. The manager also communicates with nodes using the @@ -319,6 +320,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors"); public static final Pattern PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}"); + public static final Pattern PROCESSOR_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/status"); public static final Pattern PROCESSOR_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/state"); public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}"); @@ -2410,6 +2412,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return false; } + private static boolean isProcessorStatusEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && PROCESSOR_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + private static boolean isProcessorStateEndpoint(final URI uri, final String method) { return "GET".equalsIgnoreCase(method) && PROCESSOR_STATE_URI_PATTERN.matcher(uri.getPath()).matches(); } @@ -2562,7 +2568,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C || isControllerServiceReferenceEndpoint(uri, method) || isControllerServiceStateEndpoint(uri, method) || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method) || isReportingTaskStateEndpoint(uri, method) || isDropRequestEndpoint(uri, method) || isListFlowFilesEndpoint(uri, method) - || isGroupStatusEndpoint(uri, method) || isControllerStatusEndpoint(uri, method) + || isGroupStatusEndpoint(uri, method) || isProcessorStatusEndpoint(uri, method) || isControllerStatusEndpoint(uri, method) || isProcessorStatusHistoryEndpoint(uri, method) || isProcessGroupStatusHistoryEndpoint(uri, method) || isRemoteProcessGroupStatusHistoryEndpoint(uri, method) || isConnectionStatusHistoryEndpoint(uri, method) || isBulletinBoardEndpoint(uri, method); @@ -2645,6 +2651,31 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } + private void mergeProcessorStatus(final ProcessorStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, ProcessorStatusDTO> resultMap) { + final ProcessorStatusDTO mergedProcessorStatus = statusDto; + mergedProcessorStatus.setNodeStatuses(new ArrayList<NodeProcessorStatusSnapshotDTO>()); + + final NodeProcessorStatusSnapshotDTO nodeSnapshot = new NodeProcessorStatusSnapshotDTO(); + nodeSnapshot.setStatusSnapshot(statusDto.getAggregateStatus().clone()); + nodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + nodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + nodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedProcessorStatus.getNodeStatuses().add(nodeSnapshot); + + // merge the other nodes + for (final Map.Entry<NodeIdentifier, ProcessorStatusDTO> entry : resultMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ProcessorStatusDTO nodeProcessorStatus = entry.getValue(); + if (nodeProcessorStatus == statusDto) { + continue; + } + + StatusMerger.merge(mergedProcessorStatus, nodeProcessorStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + + private void mergeControllerStatus(final ControllerStatusDTO statusDto, final Map<NodeIdentifier, ControllerStatusDTO> resultMap) { ControllerStatusDTO mergedStatus = statusDto; for (final Map.Entry<NodeIdentifier, ControllerStatusDTO> entry : resultMap.entrySet()) { @@ -3325,6 +3356,32 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C // create a new client response clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isProcessorStatusEndpoint(uri, method)) { + final ProcessorStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessorStatusEntity.class); + final ProcessorStatusDTO statusRequest = responseEntity.getProcessorStatus(); + + NodeIdentifier nodeIdentifier = null; + + final Map<NodeIdentifier, ProcessorStatusDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final ProcessorStatusEntity nodeResponseEntity; + if (nodeResponse == clientResponse) { + nodeIdentifier = nodeResponse.getNodeId(); + nodeResponseEntity = responseEntity; + } else { + nodeResponseEntity = nodeResponse.getClientResponse().getEntity(ProcessorStatusEntity.class); + } + + final ProcessorStatusDTO nodeStatus = nodeResponseEntity.getProcessorStatus(); + resultsMap.put(nodeResponse.getNodeId(), nodeStatus); + } + mergeProcessorStatus(statusRequest, nodeIdentifier, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); } else if (hasSuccessfulClientResponse && isProcessGroupEndpoint(uri, method)) { final ProcessGroupEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessGroupEntity.class); final ProcessGroupDTO responseDto = responseEntity.getProcessGroup(); http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 66455a8..61c6158 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -16,10 +16,6 @@ */ package org.apache.nifi.web; -import java.util.Collection; -import java.util.Date; -import java.util.Set; - import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceState; @@ -65,17 +61,19 @@ import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO; import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO; import org.apache.nifi.web.api.dto.search.SearchResultsDTO; -import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterStatusDTO; import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; -import org.apache.nifi.web.api.dto.status.NodeStatusDTO; import org.apache.nifi.web.api.dto.status.PortStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; +import java.util.Collection; +import java.util.Date; +import java.util.Set; + /** * Defines the NiFiServiceFacade interface. */ @@ -1552,14 +1550,6 @@ public interface NiFiServiceFacade { void deleteNode(String nodeId); /** - * Returns the status the specified node id. - * - * @param nodeId The id of the desired node - * @return The node status - */ - NodeStatusDTO getNodeStatus(String nodeId); - - /** * Returns the system diagnostics for the specified node id. * * @param nodeId The id of the desired node @@ -1574,22 +1564,6 @@ public interface NiFiServiceFacade { */ ClusterStatusDTO getClusterStatus(); - /** - * Returns an input port's status for each node connected to the cluster. - * - * @param inputPortId a port identifier - * @return The cluster port status transfer object. - */ - ClusterPortStatusDTO getClusterInputPortStatus(String inputPortId); - - /** - * Returns an output port's status for each node connected to the cluster. - * - * @param outputPortId a port identifier - * @return The cluster port status transfer object. - */ - ClusterPortStatusDTO getClusterOutputPortStatus(String outputPortId); - // ---------------------------------------- // BulletinBoard methods // ---------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 037a54d..d9135da 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -16,25 +16,6 @@ */ package org.apache.nifi.web; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import javax.ws.rs.WebApplicationException; - import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; @@ -74,9 +55,7 @@ import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceReference; import org.apache.nifi.controller.service.ControllerServiceState; -import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.diagnostics.SystemDiagnostics; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; @@ -141,11 +120,9 @@ import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO; import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO; import org.apache.nifi.web.api.dto.search.SearchResultsDTO; -import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterStatusDTO; import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; -import org.apache.nifi.web.api.dto.status.NodePortStatusDTO; import org.apache.nifi.web.api.dto.status.NodeStatusDTO; import org.apache.nifi.web.api.dto.status.PortStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; @@ -171,6 +148,24 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.security.access.AccessDeniedException; +import javax.ws.rs.WebApplicationException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + /** * Implementation of NiFiServiceFacade that performs revision checking. */ @@ -2887,209 +2882,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { clusterManager.deleteNode(nodeId, userDn); } - private PortStatus findNodeInputPortStatus(final ProcessGroupStatus groupStatus, final String inputPortId) { - PortStatus portStatus = null; - - for (final PortStatus status : groupStatus.getInputPortStatus()) { - if (inputPortId.equals(status.getId())) { - portStatus = status; - break; - } - } - - if (portStatus == null) { - for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) { - portStatus = findNodeInputPortStatus(status, inputPortId); - - if (portStatus != null) { - break; - } - } - } - - return portStatus; - } - - @Override - public ClusterPortStatusDTO getClusterInputPortStatus(String inputPortId) { - final ClusterPortStatusDTO clusterInputPortStatusDto = new ClusterPortStatusDTO(); - clusterInputPortStatusDto.setNodePortStatus(new ArrayList<NodePortStatusDTO>()); - - // set the current time - clusterInputPortStatusDto.setStatsLastRefreshed(new Date()); - - final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED); - boolean firstNode = true; - for (final Node node : nodes) { - - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - - final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus(); - if (nodeStats == null || nodeStats.getProcessorStatus() == null) { - continue; - } - - // find the input status for this node - final PortStatus inputPortStatus = findNodeInputPortStatus(nodeStats, inputPortId); - - // sanity check that we have status for this input port - if (inputPortStatus == null) { - throw new ResourceNotFoundException(String.format("Unable to find status for input port id '%s'.", inputPortId)); - } - - if (firstNode) { - clusterInputPortStatusDto.setPortId(inputPortId); - clusterInputPortStatusDto.setPortName(inputPortStatus.getName()); - firstNode = false; - } - - // create node port status dto - final NodePortStatusDTO nodeInputPortStatusDTO = new NodePortStatusDTO(); - clusterInputPortStatusDto.getNodePortStatus().add(nodeInputPortStatusDTO); - - // populate node input port status dto - final String nodeId = node.getNodeId().getId(); - nodeInputPortStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId))); - nodeInputPortStatusDTO.setPortStatus(dtoFactory.createPortStatusDto(inputPortStatus)); - } - - return clusterInputPortStatusDto; - } - - private PortStatus findNodeOutputPortStatus(final ProcessGroupStatus groupStatus, final String outputPortId) { - PortStatus portStatus = null; - - for (final PortStatus status : groupStatus.getOutputPortStatus()) { - if (outputPortId.equals(status.getId())) { - portStatus = status; - break; - } - } - - if (portStatus == null) { - for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) { - portStatus = findNodeOutputPortStatus(status, outputPortId); - - if (portStatus != null) { - break; - } - } - } - - return portStatus; - } - - @Override - public ClusterPortStatusDTO getClusterOutputPortStatus(String outputPortId) { - final ClusterPortStatusDTO clusterOutputPortStatusDto = new ClusterPortStatusDTO(); - clusterOutputPortStatusDto.setNodePortStatus(new ArrayList<NodePortStatusDTO>()); - - // set the current time - clusterOutputPortStatusDto.setStatsLastRefreshed(new Date()); - - final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED); - boolean firstNode = true; - for (final Node node : nodes) { - - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - - final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus(); - if (nodeStats == null || nodeStats.getProcessorStatus() == null) { - continue; - } - - // find the output status for this node - final PortStatus outputPortStatus = findNodeOutputPortStatus(nodeStats, outputPortId); - - // sanity check that we have status for this output port - if (outputPortStatus == null) { - throw new ResourceNotFoundException(String.format("Unable to find status for output port id '%s'.", outputPortId)); - } - - if (firstNode) { - clusterOutputPortStatusDto.setPortId(outputPortId); - clusterOutputPortStatusDto.setPortName(outputPortStatus.getName()); - firstNode = false; - } - - // create node port status dto - final NodePortStatusDTO nodeOutputPortStatusDTO = new NodePortStatusDTO(); - clusterOutputPortStatusDto.getNodePortStatus().add(nodeOutputPortStatusDTO); - - // populate node output port status dto - final String nodeId = node.getNodeId().getId(); - nodeOutputPortStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId))); - nodeOutputPortStatusDTO.setPortStatus(dtoFactory.createPortStatusDto(outputPortStatus)); - } - - return clusterOutputPortStatusDto; - } - - private RemoteProcessGroupStatus findNodeRemoteProcessGroupStatus(final ProcessGroupStatus groupStatus, final String remoteProcessGroupId) { - RemoteProcessGroupStatus remoteProcessGroupStatus = null; - - for (final RemoteProcessGroupStatus status : groupStatus.getRemoteProcessGroupStatus()) { - if (remoteProcessGroupId.equals(status.getId())) { - remoteProcessGroupStatus = status; - break; - } - } - - if (remoteProcessGroupStatus == null) { - for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) { - remoteProcessGroupStatus = findNodeRemoteProcessGroupStatus(status, remoteProcessGroupId); - - if (remoteProcessGroupStatus != null) { - break; - } - } - } - - return remoteProcessGroupStatus; - } - - - - @Override - public NodeStatusDTO getNodeStatus(String nodeId) { - // find the node in question - final Node node = clusterManager.getNode(nodeId); - - // verify node state - if (node == null) { - throw new UnknownNodeException("Node does not exist."); - } else if (Node.Status.CONNECTED != node.getStatus()) { - throw new IllegalClusterStateException( - String.format("Node '%s:%s' is not connected to the cluster.", - node.getNodeId().getApiAddress(), node.getNodeId().getApiPort())); - } - - // get the node's last heartbeat - final NodeStatusDTO nodeStatus = new NodeStatusDTO(); - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - return nodeStatus; - } - - // get the node status - final ProcessGroupStatus nodeProcessGroupStatus = nodeHeartbeatPayload.getProcessGroupStatus(); - if (nodeProcessGroupStatus == null) { - return nodeStatus; - } - - final ProcessGroupStatusDTO nodeProcessGroupStatusDto = dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), nodeProcessGroupStatus); - nodeStatus.setControllerStatus(nodeProcessGroupStatusDto); - nodeStatus.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId))); - - return nodeStatus; - } - @Override public NodeSystemDiagnosticsDTO getNodeSystemDiagnostics(String nodeId) { // find the node in question http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index 256d41f..a3774ce 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -77,6 +77,8 @@ public abstract class ApplicationResource { private static final int CLUSTER_CONTEXT_HEADER_VALUE_MAX_BYTES = (int) (0.75 * HEADER_BUFFER_SIZE); private static final Logger logger = LoggerFactory.getLogger(ApplicationResource.class); + public static final String NODEWISE = "false"; + @Context private HttpServletRequest httpServletRequest; http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java index a989620..bd6a39a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java @@ -16,27 +16,13 @@ */ package org.apache.nifi.web.api; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.List; - -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.FormParam; -import javax.ws.rs.GET; -import javax.ws.rs.HEAD; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - +import com.sun.jersey.api.core.ResourceContext; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.cluster.node.Node; @@ -51,25 +37,31 @@ import org.apache.nifi.web.api.dto.ProcessorConfigDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.search.NodeSearchResultDTO; -import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterStatusDTO; import org.apache.nifi.web.api.entity.ClusterEntity; -import org.apache.nifi.web.api.entity.ClusterPortStatusEntity; import org.apache.nifi.web.api.entity.ClusterSearchResultsEntity; import org.apache.nifi.web.api.entity.ClusterStatusEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; -import org.apache.nifi.web.api.entity.StatusHistoryEntity; import org.apache.nifi.web.api.request.ClientIdParameter; import org.apache.nifi.web.api.request.LongParameter; import org.springframework.security.access.prepost.PreAuthorize; -import com.sun.jersey.api.core.ResourceContext; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.FormParam; +import javax.ws.rs.GET; +import javax.ws.rs.HEAD; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.util.ArrayList; +import java.util.List; /** * RESTful endpoint for managing a cluster. @@ -513,193 +505,6 @@ public class ClusterResource extends ApplicationResource { throw new IllegalClusterResourceRequestException("Only a node can process the request."); } - - /** - * Gets the processor status history for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the processor - * @return A clusterProcessorStatusHistoryEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/processors/{id}/status/history") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets processor status history across the cluster", - response = StatusHistoryEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getProcessorStatusHistory( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The processor id", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - final URI uri; - try { - final URI originalUri = getAbsolutePath(); - final String newPath = "/nifi-api/processors/" + id + "/status/history"; - uri = new URI(originalUri.getScheme(), originalUri.getAuthority(), newPath, originalUri.getQuery(), originalUri.getFragment()); - } catch (final URISyntaxException use) { - throw new RuntimeException(use); - } - - return clusterManager.applyRequest(HttpMethod.GET, uri, getRequestParameters(true), getHeaders()).getResponse(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } - - - - - /** - * Gets the input port status for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the input port - * @return A clusterPortStatusEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/input-ports/{id}/status") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets input port status across the cluster", - response = ClusterPortStatusEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getInputPortStatus( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The input port id.", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - - final ClusterPortStatusDTO dto = serviceFacade.getClusterInputPortStatus(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterPortStatusEntity entity = new ClusterPortStatusEntity(); - entity.setClusterPortStatus(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } - - /** - * Gets the output port status for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the output port - * @return A clusterPortStatusEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/output-ports/{id}/status") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets output port status across the cluster", - response = ClusterPortStatusEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getOutputPortStatus( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The output port id.", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - - final ClusterPortStatusDTO dto = serviceFacade.getClusterOutputPortStatus(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterPortStatusEntity entity = new ClusterPortStatusEntity(); - entity.setClusterPortStatus(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } - - // setters public void setServiceFacade(NiFiServiceFacade serviceFacade) { this.serviceFacade = serviceFacade; http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java index f1d487b..dea0407 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java @@ -310,26 +310,60 @@ public class ConnectionResource extends ApplicationResource { @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") } ) - public Response getProcessorStatus( + public Response getConnectionStatus( @ApiParam( value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false ) @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @ApiParam( + value = "Whether or not to include the breakdown per node. Optional, defaults to false", + required = false + ) + @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise, + @ApiParam( + value = "The id of the node where to get the status.", + required = false + ) + @QueryParam("clusterNodeId") String clusterNodeId, + @ApiParam( value = "The connection id.", required = true ) @PathParam("id") String id) { - // replicate if cluster manager + // ensure a valid request + if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) { + throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node."); + } + if (properties.isClusterManager()) { - return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + // determine where this request should be sent + if (clusterNodeId == null) { + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } else { + // get the target node and ensure it exists + final Node targetNode = clusterManager.getNode(clusterNodeId); + if (targetNode == null) { + throw new UnknownNodeException("The specified cluster node does not exist."); + } + + final Set<NodeIdentifier> targetNodes = new HashSet<>(); + targetNodes.add(targetNode.getNodeId()); + + // replicate the request to the specific node + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse(); + } } // get the specified connection status final ConnectionStatusDTO connectionStatus = serviceFacade.getConnectionStatus(groupId, id); + // prune the response as necessary + if (!nodewise) { + connectionStatus.setNodeStatuses(null); + } + // create the revision final RevisionDTO revision = new RevisionDTO(); revision.setClientId(clientId.getClientId()); http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java index 8bf07c6..ba5b761 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java @@ -23,7 +23,10 @@ import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.Authorization; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.cluster.node.Node; +import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.ConfigurationSnapshot; import org.apache.nifi.web.NiFiServiceFacade; @@ -64,6 +67,7 @@ import javax.ws.rs.core.Response; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -402,25 +406,59 @@ public class InputPortResource extends ApplicationResource { @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") } ) - public Response getProcessorStatus( + public Response getInputPortStatus( @ApiParam( value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false ) @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @ApiParam( + value = "Whether or not to include the breakdown per node. Optional, defaults to false", + required = false + ) + @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise, + @ApiParam( + value = "The id of the node where to get the status.", + required = false + ) + @QueryParam("clusterNodeId") String clusterNodeId, + @ApiParam( value = "The input port id.", required = true ) @PathParam("id") String id) { - // replicate if cluster manager + // ensure a valid request + if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) { + throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node."); + } + if (properties.isClusterManager()) { - return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + // determine where this request should be sent + if (clusterNodeId == null) { + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } else { + // get the target node and ensure it exists + final Node targetNode = clusterManager.getNode(clusterNodeId); + if (targetNode == null) { + throw new UnknownNodeException("The specified cluster node does not exist."); + } + + final Set<NodeIdentifier> targetNodes = new HashSet<>(); + targetNodes.add(targetNode.getNodeId()); + + // replicate the request to the specific node + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse(); + } } // get the specified input port status - final PortStatusDTO processorStatus = serviceFacade.getInputPortStatus(groupId, id); + final PortStatusDTO portStatus = serviceFacade.getInputPortStatus(groupId, id); + + // prune the response as necessary + if (!nodewise) { + portStatus.setNodeStatuses(null); + } // create the revision final RevisionDTO revision = new RevisionDTO(); @@ -429,7 +467,7 @@ public class InputPortResource extends ApplicationResource { // generate the response entity final PortStatusEntity entity = new PortStatusEntity(); entity.setRevision(revision); - entity.setPortStatus(processorStatus); + entity.setPortStatus(portStatus); // generate the response return clusterContext(generateOkResponse(entity)).build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java index c88cc68..e4258f6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java @@ -22,6 +22,18 @@ import com.wordnik.swagger.annotations.ApiParam; import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.Authorization; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.IllegalClusterResourceRequestException; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.api.dto.NodeDTO; +import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.entity.NodeEntity; +import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity; +import org.apache.nifi.web.api.request.ClientIdParameter; +import org.springframework.security.access.prepost.PreAuthorize; + import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; @@ -34,19 +46,6 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.api.dto.NodeDTO; -import org.apache.nifi.web.api.entity.NodeEntity; -import org.apache.nifi.web.api.request.ClientIdParameter; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.web.IllegalClusterResourceRequestException; -import org.apache.nifi.web.NiFiServiceFacade; -import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO; -import org.apache.nifi.web.api.dto.RevisionDTO; -import org.apache.nifi.web.api.dto.status.NodeStatusDTO; -import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; -import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity; -import org.springframework.security.access.prepost.PreAuthorize; /** * RESTful endpoint for managing a cluster connection. @@ -122,68 +121,6 @@ public class NodeResource extends ApplicationResource { } /** - * Gets the status for the specified node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the node. - * @return A processGroupStatusEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) - @Path("/{id}/status") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets process group status for a node in the cluster", - response = ProcessGroupStatusEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"), - @Authorization(value = "Administrator", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getNodeStatus( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The node id.", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - // get the node statistics - final NodeStatusDTO nodeStatus = serviceFacade.getNodeStatus(id); - - // create the revision - final RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create the node statics entity - final ProcessGroupStatusEntity entity = new ProcessGroupStatusEntity(); - entity.setRevision(revision); - entity.setProcessGroupStatus(nodeStatus.getControllerStatus()); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } - - /** * Gets the system diagnositics for the specified node. * * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java index f72d3e7..450b005 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java @@ -25,6 +25,7 @@ import com.wordnik.swagger.annotations.Authorization; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -46,7 +47,11 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; + +import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.cluster.node.Node; +import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.ConfigurationSnapshot; import org.apache.nifi.web.NiFiServiceFacade; @@ -401,25 +406,59 @@ public class OutputPortResource extends ApplicationResource { @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") } ) - public Response getProcessorStatus( + public Response getOutputPortStatus( @ApiParam( value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false ) @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @ApiParam( + value = "Whether or not to include the breakdown per node. Optional, defaults to false", + required = false + ) + @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise, + @ApiParam( + value = "The id of the node where to get the status.", + required = false + ) + @QueryParam("clusterNodeId") String clusterNodeId, + @ApiParam( value = "The output port id.", required = true ) @PathParam("id") String id) { - // replicate if cluster manager + // ensure a valid request + if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) { + throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node."); + } + if (properties.isClusterManager()) { - return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + // determine where this request should be sent + if (clusterNodeId == null) { + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } else { + // get the target node and ensure it exists + final Node targetNode = clusterManager.getNode(clusterNodeId); + if (targetNode == null) { + throw new UnknownNodeException("The specified cluster node does not exist."); + } + + final Set<NodeIdentifier> targetNodes = new HashSet<>(); + targetNodes.add(targetNode.getNodeId()); + + // replicate the request to the specific node + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse(); + } } // get the specified output port status - final PortStatusDTO processorStatus = serviceFacade.getOutputPortStatus(groupId, id); + final PortStatusDTO portStatus = serviceFacade.getOutputPortStatus(groupId, id); + + // prune the response as necessary + if (!nodewise) { + portStatus.setNodeStatuses(null); + } // create the revision final RevisionDTO revision = new RevisionDTO(); @@ -428,7 +467,7 @@ public class OutputPortResource extends ApplicationResource { // generate the response entity final PortStatusEntity entity = new PortStatusEntity(); entity.setRevision(revision); - entity.setPortStatus(processorStatus); + entity.setPortStatus(portStatus); // generate the response return clusterContext(generateOkResponse(entity)).build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index cd4bd73..8b6c4b4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -19,6 +19,7 @@ package org.apache.nifi.web.api; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -42,7 +43,10 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.cluster.node.Node; +import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.ConfigurationSnapshot; import org.apache.nifi.web.NiFiServiceFacade; @@ -1254,7 +1258,7 @@ public class ProcessGroupResource extends ApplicationResource { * Retrieves the status report for this NiFi. * * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param recursive Optional recursive flag that defaults to false. If set to true, all descendent groups and their content will be included if the verbose flag is also set to true. + * @param recursive Optional recursive flag that defaults to false. If set to true, all descendant groups and the status of their content will be included. * @return A processGroupStatusEntity. */ @GET @@ -1284,21 +1288,66 @@ public class ProcessGroupResource extends ApplicationResource { } ) public Response getProcessGroupStatus( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @QueryParam("recursive") @DefaultValue(RECURSIVE) Boolean recursive) { + @ApiParam( + value = "Whether all descendant groups and the status of their content will be included. Optional, defaults to false", + required = false + ) + @QueryParam("recursive") @DefaultValue(RECURSIVE) Boolean recursive, + @ApiParam( + value = "Whether or not to include the breakdown per node. Optional, defaults to false", + required = false + ) + @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise, + @ApiParam( + value = "The id of the node where to get the status.", + required = false + ) + @QueryParam("clusterNodeId") String clusterNodeId) { + + // ensure a valid request + if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) { + throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node."); + } if (properties.isClusterManager()) { - return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + // determine where this request should be sent + if (clusterNodeId == null) { + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } else { + // get the target node and ensure it exists + final Node targetNode = clusterManager.getNode(clusterNodeId); + if (targetNode == null) { + throw new UnknownNodeException("The specified cluster node does not exist."); + } + + final Set<NodeIdentifier> targetNodes = new HashSet<>(); + targetNodes.add(targetNode.getNodeId()); + + // replicate the request to the specific node + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse(); + } } // get the status final ProcessGroupStatusDTO statusReport = serviceFacade.getProcessGroupStatus(groupId); // prune the response as necessary + if (!nodewise) { + statusReport.setNodeStatuses(null); + } + + // prune the response as necessary if (!recursive) { prune(statusReport.getAggregateStatus()); - for (final NodeProcessGroupStatusSnapshotDTO nodeSnapshot : statusReport.getNodeStatuses()) { - prune(nodeSnapshot.getStatusSnapshot()); + if (statusReport.getNodeStatuses() != null) { + for (final NodeProcessGroupStatusSnapshotDTO nodeSnapshot : statusReport.getNodeStatuses()) { + prune(nodeSnapshot.getStatusSnapshot()); + } } } @@ -1316,12 +1365,14 @@ public class ProcessGroupResource extends ApplicationResource { } private void prune(final ProcessGroupStatusSnapshotDTO snapshot) { - snapshot.setConnectionStatusSnapshots(null); - snapshot.setProcessGroupStatusSnapshots(null); - snapshot.setInputPortStatusSnapshots(null); - snapshot.setOutputPortStatusSnapshots(null); - snapshot.setProcessorStatusSnapshots(null); - snapshot.setRemoteProcessGroupStatusSnapshots(null); + for (final ProcessGroupStatusSnapshotDTO childProcessGroupStatus : snapshot.getProcessGroupStatusSnapshots()) { + childProcessGroupStatus.setConnectionStatusSnapshots(null); + childProcessGroupStatus.setProcessGroupStatusSnapshots(null); + childProcessGroupStatus.setInputPortStatusSnapshots(null); + childProcessGroupStatus.setOutputPortStatusSnapshots(null); + childProcessGroupStatus.setProcessorStatusSnapshots(null); + childProcessGroupStatus.setRemoteProcessGroupStatusSnapshots(null); + } } /** http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java index 4ccfb61..683d01b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java @@ -23,9 +23,12 @@ import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.Authorization; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; +import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.cluster.node.Node; +import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.ui.extension.UiExtension; import org.apache.nifi.ui.extension.UiExtensionMapping; @@ -77,6 +80,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -453,14 +457,51 @@ public class ProcessorResource extends ApplicationResource { ) @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @ApiParam( + value = "Whether or not to include the breakdown per node. Optional, defaults to false", + required = false + ) + @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise, + @ApiParam( + value = "The id of the node where to get the status.", + required = false + ) + @QueryParam("clusterNodeId") String clusterNodeId, + @ApiParam( value = "The processor id.", required = true ) @PathParam("id") String id) { - // replicate if cluster manager + // ensure a valid request + if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) { + throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node."); + } + if (properties.isClusterManager()) { - return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + // determine where this request should be sent + if (clusterNodeId == null) { + final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()); + final ProcessorStatusEntity entity = (ProcessorStatusEntity) nodeResponse.getUpdatedEntity(); + + // ensure there is an updated entity (result of merging) and prune the response as necessary + if (entity != null && !nodewise) { + entity.getProcessorStatus().setNodeStatuses(null); + } + + return nodeResponse.getResponse(); + } else { + // get the target node and ensure it exists + final Node targetNode = clusterManager.getNode(clusterNodeId); + if (targetNode == null) { + throw new UnknownNodeException("The specified cluster node does not exist."); + } + + final Set<NodeIdentifier> targetNodes = new HashSet<>(); + targetNodes.add(targetNode.getNodeId()); + + // replicate the request to the specific node + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse(); + } } // get the specified processor status http://git-wip-us.apache.org/repos/asf/nifi/blob/b5cb5da6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java index f03af44..19b29c4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java @@ -23,7 +23,10 @@ import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.Authorization; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.cluster.node.Node; +import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.ConfigurationSnapshot; import org.apache.nifi.web.NiFiServiceFacade; @@ -67,6 +70,7 @@ import javax.ws.rs.core.Response; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -283,26 +287,60 @@ public class RemoteProcessGroupResource extends ApplicationResource { @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") } ) - public Response getProcessorStatus( + public Response getRemoteProcessGroupStatus( @ApiParam( value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false ) @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @ApiParam( + value = "Whether or not to include the breakdown per node. Optional, defaults to false", + required = false + ) + @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise, + @ApiParam( + value = "The id of the node where to get the status.", + required = false + ) + @QueryParam("clusterNodeId") String clusterNodeId, + @ApiParam( value = "The remote process group id.", required = true ) @PathParam("id") String id) { - // replicate if cluster manager + // ensure a valid request + if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) { + throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node."); + } + if (properties.isClusterManager()) { - return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + // determine where this request should be sent + if (clusterNodeId == null) { + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } else { + // get the target node and ensure it exists + final Node targetNode = clusterManager.getNode(clusterNodeId); + if (targetNode == null) { + throw new UnknownNodeException("The specified cluster node does not exist."); + } + + final Set<NodeIdentifier> targetNodes = new HashSet<>(); + targetNodes.add(targetNode.getNodeId()); + + // replicate the request to the specific node + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse(); + } } // get the specified remote process group status final RemoteProcessGroupStatusDTO remoteProcessGroupStatus = serviceFacade.getRemoteProcessGroupStatus(groupId, id); + // prune the response as necessary + if (!nodewise) { + remoteProcessGroupStatus.setNodeStatuses(null); + } + // create the revision final RevisionDTO revision = new RevisionDTO(); revision.setClientId(clientId.getClientId());
