http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 945c671..b981bde 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -27,11 +27,8 @@ import org.apache.nifi.admin.service.AccountNotFoundException; import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.admin.service.UserService; import org.apache.nifi.authorization.Authority; -import org.apache.nifi.cluster.HeartbeatPayload; import org.apache.nifi.cluster.context.ClusterContext; import org.apache.nifi.cluster.context.ClusterContextThreadLocal; -import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; -import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.cluster.node.Node; @@ -55,11 +52,6 @@ import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceReference; import org.apache.nifi.controller.service.ControllerServiceState; -import org.apache.nifi.controller.status.ConnectionStatus; -import org.apache.nifi.controller.status.PortStatus; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.ProcessorStatus; -import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.diagnostics.SystemDiagnostics; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; @@ -72,7 +64,6 @@ import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinQuery; import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.user.AccountStatus; import org.apache.nifi.user.NiFiUser; import org.apache.nifi.user.NiFiUserGroup; @@ -91,6 +82,7 @@ import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; import org.apache.nifi.web.api.dto.CounterDTO; import org.apache.nifi.web.api.dto.CountersDTO; +import org.apache.nifi.web.api.dto.CountersSnapshotDTO; import org.apache.nifi.web.api.dto.DocumentedTypeDTO; import org.apache.nifi.web.api.dto.DropRequestDTO; import org.apache.nifi.web.api.dto.DtoFactory; @@ -100,7 +92,6 @@ import org.apache.nifi.web.api.dto.FunnelDTO; import org.apache.nifi.web.api.dto.LabelDTO; import org.apache.nifi.web.api.dto.ListingRequestDTO; import org.apache.nifi.web.api.dto.NodeDTO; -import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO; import org.apache.nifi.web.api.dto.PortDTO; import org.apache.nifi.web.api.dto.PreviousValueDTO; import org.apache.nifi.web.api.dto.ProcessGroupDTO; @@ -125,21 +116,12 @@ import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO; import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO; import org.apache.nifi.web.api.dto.search.SearchResultsDTO; -import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO; +import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; -import org.apache.nifi.web.api.dto.status.NodeConnectionStatusDTO; -import org.apache.nifi.web.api.dto.status.NodePortStatusDTO; -import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.NodeProcessorStatusDTO; -import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.NodeStatusDTO; +import org.apache.nifi.web.api.dto.status.PortStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; +import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.dao.ConnectionDAO; @@ -393,7 +375,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<ConnectionDTO>() { @Override public boolean isNew() { return false; @@ -424,7 +406,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<ProcessorDTO>() { @Override public boolean isNew() { return false; @@ -455,7 +437,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save updated controller controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<LabelDTO>() { @Override public boolean isNew() { return false; @@ -486,7 +468,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save updated controller controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<FunnelDTO>() { @Override public boolean isNew() { return false; @@ -532,7 +514,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new ConfigurationResult() { + return new ConfigurationResult<SnippetDTO>() { @Override public boolean isNew() { return false; @@ -562,7 +544,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save updated controller controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<PortDTO>() { @Override public boolean isNew() { return false; @@ -592,7 +574,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save updated controller controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<PortDTO>() { @Override public boolean isNew() { return false; @@ -622,7 +604,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save updated controller controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<RemoteProcessGroupDTO>() { @Override public boolean isNew() { return false; @@ -649,7 +631,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save updated controller controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<RemoteProcessGroupPortDTO>() { @Override public boolean isNew() { return false; @@ -676,7 +658,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save updated controller controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<RemoteProcessGroupPortDTO>() { @Override public boolean isNew() { return false; @@ -711,7 +693,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save updated controller controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<ProcessGroupDTO>() { @Override public boolean isNew() { return false; @@ -751,7 +733,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<ControllerConfigurationDTO>() { @Override public boolean isNew() { return false; @@ -808,14 +790,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // clear the state for the specified component processorDAO.clearState(groupId, processorId); - return new ConfigurationResult() { + return new ConfigurationResult<Void>() { @Override public boolean isNew() { return false; } @Override - public ControllerConfigurationDTO getConfiguration() { + public Void getConfiguration() { return null; } }; @@ -836,14 +818,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // clear the state for the specified component controllerServiceDAO.clearState(controllerServiceId); - return new ConfigurationResult() { + return new ConfigurationResult<Void>() { @Override public boolean isNew() { return false; } @Override - public ControllerConfigurationDTO getConfiguration() { + public Void getConfiguration() { return null; } }; @@ -864,14 +846,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // clear the state for the specified component reportingTaskDAO.clearState(reportingTaskId); - return new ConfigurationResult() { + return new ConfigurationResult<Void>() { @Override public boolean isNew() { return false; } @Override - public ControllerConfigurationDTO getConfiguration() { + public Void getConfiguration() { return null; } }; @@ -889,14 +871,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<Void>() { @Override public boolean isNew() { return false; } @Override - public ControllerConfigurationDTO getConfiguration() { + public Void getConfiguration() { return null; } }; @@ -936,14 +918,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<Void>() { @Override public boolean isNew() { return false; } @Override - public ControllerConfigurationDTO getConfiguration() { + public Void getConfiguration() { return null; } }; @@ -962,14 +944,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<Void>() { @Override public boolean isNew() { return false; } @Override - public ControllerConfigurationDTO getConfiguration() { + public Void getConfiguration() { return null; } }; @@ -988,14 +970,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<Void>() { @Override public boolean isNew() { return false; } @Override - public ControllerConfigurationDTO getConfiguration() { + public Void getConfiguration() { return null; } }; @@ -1025,14 +1007,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new ConfigurationResult() { + return new ConfigurationResult<Void>() { @Override public boolean isNew() { return false; } @Override - public ControllerConfigurationDTO getConfiguration() { + public Void getConfiguration() { return null; } }; @@ -1050,14 +1032,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<Void>() { @Override public boolean isNew() { return false; } @Override - public ControllerConfigurationDTO getConfiguration() { + public Void getConfiguration() { return null; } }; @@ -1075,14 +1057,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<Void>() { @Override public boolean isNew() { return false; } @Override - public ControllerConfigurationDTO getConfiguration() { + public Void getConfiguration() { return null; } }; @@ -1100,14 +1082,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<Void>() { @Override public boolean isNew() { return false; } @Override - public ControllerConfigurationDTO getConfiguration() { + public Void getConfiguration() { return null; } }; @@ -1125,14 +1107,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<Void>() { @Override public boolean isNew() { return false; } @Override - public ControllerConfigurationDTO getConfiguration() { + public Void getConfiguration() { return null; } }; @@ -1161,7 +1143,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<ConnectionDTO>() { @Override public boolean isNew() { return true; @@ -1213,7 +1195,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<ProcessorDTO>() { @Override public boolean isNew() { return true; @@ -1244,7 +1226,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<LabelDTO>() { @Override public boolean isNew() { return true; @@ -1275,7 +1257,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<FunnelDTO>() { @Override public boolean isNew() { return true; @@ -1366,7 +1348,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<FlowSnippetDTO>() { @Override public boolean isNew() { return false; @@ -1396,7 +1378,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final SnippetDTO responseSnippetDTO = dtoFactory.createSnippetDto(snippet); responseSnippetDTO.setContents(snippetUtils.populateFlowSnippet(snippet, false, false)); - return new ConfigurationResult() { + return new ConfigurationResult<SnippetDTO>() { @Override public boolean isNew() { return true; @@ -1426,7 +1408,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<PortDTO>() { @Override public boolean isNew() { return true; @@ -1456,7 +1438,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<PortDTO>() { @Override public boolean isNew() { return true; @@ -1486,7 +1468,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<ProcessGroupDTO>() { @Override public boolean isNew() { return true; @@ -1516,7 +1498,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<RemoteProcessGroupDTO>() { @Override public boolean isNew() { return true; @@ -1588,7 +1570,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<FlowSnippetDTO>() { @Override public boolean isNew() { return false; @@ -1611,14 +1593,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // create the archive controllerFacade.createArchive(); - return new ConfigurationResult() { + return new ConfigurationResult<Void>() { @Override public boolean isNew() { return false; } @Override - public ControllerConfigurationDTO getConfiguration() { + public Void getConfiguration() { return null; } }; @@ -1654,7 +1636,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult<ProcessorDTO>() { @Override public boolean isNew() { return false; @@ -1689,7 +1671,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new ConfigurationResult() { + return new ConfigurationResult<ControllerServiceDTO>() { @Override public boolean isNew() { return true; @@ -1723,7 +1705,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new ConfigurationResult() { + return new ConfigurationResult<ControllerServiceDTO>() { @Override public boolean isNew() { return false; @@ -1750,7 +1732,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { public ConfigurationResult<Set<ControllerServiceReferencingComponentDTO>> execute() { final ControllerServiceReference reference = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState); - return new ConfigurationResult() { + return new ConfigurationResult<Set<ControllerServiceReferencingComponentDTO>>() { @Override public boolean isNew() { return false; @@ -1780,14 +1762,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new ConfigurationResult() { + return new ConfigurationResult<Void>() { @Override public boolean isNew() { return false; } @Override - public ControllerConfigurationDTO getConfiguration() { + public Void getConfiguration() { return null; } }; @@ -1815,7 +1797,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new ConfigurationResult() { + return new ConfigurationResult<ReportingTaskDTO>() { @Override public boolean isNew() { return true; @@ -1849,7 +1831,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new ConfigurationResult() { + return new ConfigurationResult<ReportingTaskDTO>() { @Override public boolean isNew() { return false; @@ -1879,14 +1861,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new ConfigurationResult() { + return new ConfigurationResult<Void>() { @Override public boolean isNew() { return false; } @Override - public ControllerConfigurationDTO getConfiguration() { + public Void getConfiguration() { return null; } }; @@ -2105,78 +2087,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ProcessGroupStatusDTO getProcessGroupStatus(String groupId) { - ProcessGroupStatusDTO statusReport; - if (properties.isClusterManager()) { - final ProcessGroupStatus mergedProcessGroupStatus = clusterManager.getProcessGroupStatus(groupId); - if (mergedProcessGroupStatus == null) { - throw new ResourceNotFoundException(String.format("Unable to find status for process group %s.", groupId)); - } - statusReport = dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), mergedProcessGroupStatus); - } else { - statusReport = controllerFacade.getProcessGroupStatus(groupId); - } - return statusReport; + return controllerFacade.getProcessGroupStatus(groupId); } @Override public ControllerStatusDTO getControllerStatus() { - final ControllerStatusDTO controllerStatus; - - if (properties.isClusterManager()) { - final Set<Node> connectedNodes = clusterManager.getNodes(Node.Status.CONNECTED); - - if (connectedNodes.isEmpty()) { - throw new NoConnectedNodesException(); - } - - int activeThreadCount = 0; - long totalFlowFileObjectCount = 0; - long totalFlowFileByteCount = 0; - for (final Node node : connectedNodes) { - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - - activeThreadCount += nodeHeartbeatPayload.getActiveThreadCount(); - totalFlowFileObjectCount += nodeHeartbeatPayload.getTotalFlowFileCount(); - totalFlowFileByteCount += nodeHeartbeatPayload.getTotalFlowFileBytes(); - } - - controllerStatus = new ControllerStatusDTO(); - controllerStatus.setActiveThreadCount(activeThreadCount); - controllerStatus.setQueued(FormatUtils.formatCount(totalFlowFileObjectCount) + " / " + FormatUtils.formatDataSize(totalFlowFileByteCount)); - - final int numNodes = clusterManager.getNodeIds().size(); - controllerStatus.setConnectedNodes(connectedNodes.size() + " / " + numNodes); - - // get the bulletins for the controller - final BulletinRepository bulletinRepository = clusterManager.getBulletinRepository(); - controllerStatus.setBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController())); - - // get the controller service bulletins - final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build(); - controllerStatus.setControllerServiceBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletins(controllerServiceQuery))); - - // get the reporting task bulletins - final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build(); - controllerStatus.setReportingTaskBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletins(reportingTaskQuery))); - - // get the component counts by extracting them from the roots' group status - final ProcessGroupStatus status = clusterManager.getProcessGroupStatus("root"); - if (status != null) { - final ProcessGroupCounts counts = extractProcessGroupCounts(status); - controllerStatus.setRunningCount(counts.getRunningCount()); - controllerStatus.setStoppedCount(counts.getStoppedCount()); - controllerStatus.setInvalidCount(counts.getInvalidCount()); - controllerStatus.setDisabledCount(counts.getDisabledCount()); - controllerStatus.setActiveRemotePortCount(counts.getActiveRemotePortCount()); - controllerStatus.setInactiveRemotePortCount(counts.getInactiveRemotePortCount()); - } - } else { - // get the controller status - controllerStatus = controllerFacade.getControllerStatus(); - } + // get the controller status + final ControllerStatusDTO controllerStatus = controllerFacade.getControllerStatus(); // determine if there are any pending user accounts - only include if appropriate if (NiFiUserUtils.getAuthorities().contains(Authority.ROLE_ADMIN.toString())) { @@ -2218,53 +2135,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public CountersDTO getCounters() { - if (properties.isClusterManager()) { - final Map<String, CounterDTO> mergedCountersMap = new HashMap<>(); - final Set<Node> connectedNodes = clusterManager.getNodes(Node.Status.CONNECTED); - - if (connectedNodes.isEmpty()) { - throw new NoConnectedNodesException(); - } - - for (final Node node : connectedNodes) { - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - final List<Counter> nodeCounters = node.getHeartbeatPayload().getCounters(); - if (nodeCounters == null) { - continue; - } - - // for each node, add its counter values to the aggregate values - for (final Counter nodeCounter : nodeCounters) { - final CounterDTO mergedCounter = mergedCountersMap.get(nodeCounter.getIdentifier()); - - // either create a new aggregate counter or update the aggregate counter - if (mergedCounter == null) { - // add new counter - mergedCountersMap.put(nodeCounter.getIdentifier(), dtoFactory.createCounterDto(nodeCounter)); - } else { - // update aggregate counter - mergedCounter.setValueCount(mergedCounter.getValueCount() + nodeCounter.getValue()); - mergedCounter.setValue(FormatUtils.formatCount(mergedCounter.getValueCount())); - } - } - } - - final CountersDTO mergedCounters = new CountersDTO(); - mergedCounters.setGenerated(new Date()); - mergedCounters.setCounters(mergedCountersMap.values()); - return mergedCounters; - } else { - List<Counter> counters = controllerFacade.getCounters(); - Set<CounterDTO> counterDTOs = new LinkedHashSet<>(counters.size()); - for (Counter counter : counters) { - counterDTOs.add(dtoFactory.createCounterDto(counter)); - } - return dtoFactory.createCountersDto(counterDTOs); + List<Counter> counters = controllerFacade.getCounters(); + Set<CounterDTO> counterDTOs = new LinkedHashSet<>(counters.size()); + for (Counter counter : counters) { + counterDTOs.add(dtoFactory.createCounterDto(counter)); } + final CountersSnapshotDTO snapshotDto = dtoFactory.createCountersDto(counterDTOs); + final CountersDTO countersDto = new CountersDTO(); + countersDto.setAggregateSnapshot(snapshotDto); + + return countersDto; } @Override @@ -2308,6 +2189,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public ConnectionStatusDTO getConnectionStatus(String groupId, String connectionId) { + return controllerFacade.getConnectionStatus(groupId, connectionId); + } + + @Override public StatusHistoryDTO getConnectionStatusHistory(String groupId, String connectionId) { return controllerFacade.getConnectionStatusHistory(groupId, connectionId); } @@ -2386,6 +2272,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public ProcessorStatusDTO getProcessorStatus(String groupId, String id) { + return controllerFacade.getProcessorStatus(groupId, id); + } + + @Override public StatusHistoryDTO getProcessorStatusHistory(String groupId, String id) { return controllerFacade.getProcessorStatusHistory(groupId, id); } @@ -2429,18 +2320,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public SystemDiagnosticsDTO getSystemDiagnostics() { - final SystemDiagnosticsDTO dto; - if (properties.isClusterManager()) { - final SystemDiagnostics clusterDiagnostics = clusterManager.getSystemDiagnostics(); - if (clusterDiagnostics == null) { - throw new IllegalStateException("Nodes are connected but no systems diagnostics have been reported."); - } - dto = dtoFactory.createSystemDiagnosticsDto(clusterDiagnostics); - } else { - final SystemDiagnostics sysDiagnostics = controllerFacade.getSystemDiagnostics(); - dto = dtoFactory.createSystemDiagnosticsDto(sysDiagnostics); - } - return dto; + final SystemDiagnostics sysDiagnostics = controllerFacade.getSystemDiagnostics(); + return dtoFactory.createSystemDiagnosticsDto(sysDiagnostics); } /** @@ -2651,16 +2532,31 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public PortStatusDTO getInputPortStatus(String groupId, String inputPortId) { + return controllerFacade.getInputPortStatus(groupId, inputPortId); + } + + @Override public PortDTO getOutputPort(String groupId, String outputPortId) { return dtoFactory.createPortDto(outputPortDAO.getPort(groupId, outputPortId)); } @Override + public PortStatusDTO getOutputPortStatus(String groupId, String outputPortId) { + return controllerFacade.getOutputPortStatus(groupId, outputPortId); + } + + @Override public RemoteProcessGroupDTO getRemoteProcessGroup(String groupId, String remoteProcessGroupId) { return dtoFactory.createRemoteProcessGroupDto(remoteProcessGroupDAO.getRemoteProcessGroup(groupId, remoteProcessGroupId)); } @Override + public RemoteProcessGroupStatusDTO getRemoteProcessGroupStatus(String groupId, String id) { + return controllerFacade.getRemoteProcessGroupStatus(groupId, id); + } + + @Override public StatusHistoryDTO getRemoteProcessGroupStatusHistory(String groupId, String id) { return controllerFacade.getRemoteProcessGroupStatusHistory(groupId, id); } @@ -2934,575 +2830,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { clusterManager.deleteNode(nodeId, userDn); } - private ProcessorStatus findNodeProcessorStatus(final ProcessGroupStatus groupStatus, final String processorId) { - ProcessorStatus processorStatus = null; - - for (final ProcessorStatus status : groupStatus.getProcessorStatus()) { - if (processorId.equals(status.getId())) { - processorStatus = status; - break; - } - } - - if (processorStatus == null) { - for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) { - processorStatus = findNodeProcessorStatus(status, processorId); - - if (processorStatus != null) { - break; - } - } - } - - return processorStatus; - } - - // TODO Refactor!!! - @Override - public ClusterProcessorStatusDTO getClusterProcessorStatus(String processorId) { - - final ClusterProcessorStatusDTO clusterProcessorStatusDto = new ClusterProcessorStatusDTO(); - clusterProcessorStatusDto.setNodeProcessorStatus(new ArrayList<NodeProcessorStatusDTO>()); - - // set the current time - clusterProcessorStatusDto.setStatsLastRefreshed(new Date()); - - final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED); - boolean firstNode = true; - for (final Node node : nodes) { - - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - - final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus(); - if (nodeStats == null || nodeStats.getProcessorStatus() == null) { - continue; - } - - // attempt to find the processor stats for this node - final ProcessorStatus processorStatus = findNodeProcessorStatus(nodeStats, processorId); - - // sanity check that we have status for this processor - if (processorStatus == null) { - throw new ResourceNotFoundException(String.format("Unable to find status for processor id '%s'.", processorId)); - } - - if (firstNode) { - clusterProcessorStatusDto.setProcessorId(processorId); - clusterProcessorStatusDto.setProcessorName(processorStatus.getName()); - clusterProcessorStatusDto.setProcessorType(processorStatus.getType()); - clusterProcessorStatusDto.setProcessorRunStatus(processorStatus.getRunStatus().toString()); - firstNode = false; - } - - // create node processor status dto - final NodeProcessorStatusDTO nodeProcessorStatusDTO = new NodeProcessorStatusDTO(); - clusterProcessorStatusDto.getNodeProcessorStatus().add(nodeProcessorStatusDTO); - - // populate node processor status dto - final String nodeId = node.getNodeId().getId(); - nodeProcessorStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId))); - nodeProcessorStatusDTO.setProcessorStatus(dtoFactory.createProcessorStatusDto(processorStatus)); - - } - - return clusterProcessorStatusDto; - } - - private ConnectionStatus findNodeConnectionStatus(final ProcessGroupStatus groupStatus, final String connectionId) { - ConnectionStatus connectionStatus = null; - - for (final ConnectionStatus status : groupStatus.getConnectionStatus()) { - if (connectionId.equals(status.getId())) { - connectionStatus = status; - break; - } - } - - if (connectionStatus == null) { - for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) { - connectionStatus = findNodeConnectionStatus(status, connectionId); - - if (connectionStatus != null) { - break; - } - } - } - - return connectionStatus; - } - - @Override - public ClusterConnectionStatusDTO getClusterConnectionStatus(String connectionId) { - final ClusterConnectionStatusDTO clusterConnectionStatusDto = new ClusterConnectionStatusDTO(); - clusterConnectionStatusDto.setNodeConnectionStatus(new ArrayList<NodeConnectionStatusDTO>()); - - // set the current time - clusterConnectionStatusDto.setStatsLastRefreshed(new Date()); - - final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED); - boolean firstNode = true; - for (final Node node : nodes) { - - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - - final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus(); - if (nodeStats == null || nodeStats.getProcessorStatus() == null) { - continue; - } - - // find the connection status for this node - final ConnectionStatus connectionStatus = findNodeConnectionStatus(nodeStats, connectionId); - - // sanity check that we have status for this connection - if (connectionStatus == null) { - throw new ResourceNotFoundException(String.format("Unable to find status for connection id '%s'.", connectionId)); - } - - if (firstNode) { - clusterConnectionStatusDto.setConnectionId(connectionId); - clusterConnectionStatusDto.setConnectionName(connectionStatus.getName()); - firstNode = false; - } - - // create node connection status dto - final NodeConnectionStatusDTO nodeConnectionStatusDTO = new NodeConnectionStatusDTO(); - clusterConnectionStatusDto.getNodeConnectionStatus().add(nodeConnectionStatusDTO); - - // populate node processor status dto - final String nodeId = node.getNodeId().getId(); - nodeConnectionStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId))); - nodeConnectionStatusDTO.setConnectionStatus(dtoFactory.createConnectionStatusDto(connectionStatus)); - - } - - return clusterConnectionStatusDto; - } - - private ProcessGroupStatus findNodeProcessGroupStatus(final ProcessGroupStatus groupStatus, final String processGroupId) { - ProcessGroupStatus processGroupStatus = null; - - if (processGroupId.equals(groupStatus.getId())) { - processGroupStatus = groupStatus; - } - - if (processGroupStatus == null) { - for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) { - processGroupStatus = findNodeProcessGroupStatus(status, processGroupId); - - if (processGroupStatus != null) { - break; - } - } - } - - return processGroupStatus; - } - - @Override - public ClusterProcessGroupStatusDTO getClusterProcessGroupStatus(String processGroupId) { - - final ClusterProcessGroupStatusDTO clusterProcessGroupStatusDto = new ClusterProcessGroupStatusDTO(); - clusterProcessGroupStatusDto.setNodeProcessGroupStatus(new ArrayList<NodeProcessGroupStatusDTO>()); - - // set the current time - clusterProcessGroupStatusDto.setStatsLastRefreshed(new Date()); - - final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED); - boolean firstNode = true; - for (final Node node : nodes) { - - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - - final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus(); - if (nodeStats == null || nodeStats.getProcessorStatus() == null) { - continue; - } - - // attempt to find the process group stats for this node - final ProcessGroupStatus processGroupStatus = findNodeProcessGroupStatus(nodeStats, processGroupId); - - // sanity check that we have status for this process group - if (processGroupStatus == null) { - throw new ResourceNotFoundException(String.format("Unable to find status for process group id '%s'.", processGroupId)); - } - - if (firstNode) { - clusterProcessGroupStatusDto.setProcessGroupId(processGroupId); - clusterProcessGroupStatusDto.setProcessGroupName(processGroupStatus.getName()); - firstNode = false; - } - - // create node process group status dto - final NodeProcessGroupStatusDTO nodeProcessGroupStatusDTO = new NodeProcessGroupStatusDTO(); - clusterProcessGroupStatusDto.getNodeProcessGroupStatus().add(nodeProcessGroupStatusDTO); - - // populate node process group status dto - final String nodeId = node.getNodeId().getId(); - nodeProcessGroupStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId))); - nodeProcessGroupStatusDTO.setProcessGroupStatus(dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), processGroupStatus)); - - } - - return clusterProcessGroupStatusDto; - } - - private PortStatus findNodeInputPortStatus(final ProcessGroupStatus groupStatus, final String inputPortId) { - PortStatus portStatus = null; - - for (final PortStatus status : groupStatus.getInputPortStatus()) { - if (inputPortId.equals(status.getId())) { - portStatus = status; - break; - } - } - - if (portStatus == null) { - for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) { - portStatus = findNodeInputPortStatus(status, inputPortId); - - if (portStatus != null) { - break; - } - } - } - - return portStatus; - } - - @Override - public ClusterPortStatusDTO getClusterInputPortStatus(String inputPortId) { - final ClusterPortStatusDTO clusterInputPortStatusDto = new ClusterPortStatusDTO(); - clusterInputPortStatusDto.setNodePortStatus(new ArrayList<NodePortStatusDTO>()); - - // set the current time - clusterInputPortStatusDto.setStatsLastRefreshed(new Date()); - - final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED); - boolean firstNode = true; - for (final Node node : nodes) { - - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - - final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus(); - if (nodeStats == null || nodeStats.getProcessorStatus() == null) { - continue; - } - - // find the input status for this node - final PortStatus inputPortStatus = findNodeInputPortStatus(nodeStats, inputPortId); - - // sanity check that we have status for this input port - if (inputPortStatus == null) { - throw new ResourceNotFoundException(String.format("Unable to find status for input port id '%s'.", inputPortId)); - } - - if (firstNode) { - clusterInputPortStatusDto.setPortId(inputPortId); - clusterInputPortStatusDto.setPortName(inputPortStatus.getName()); - firstNode = false; - } - - // create node port status dto - final NodePortStatusDTO nodeInputPortStatusDTO = new NodePortStatusDTO(); - clusterInputPortStatusDto.getNodePortStatus().add(nodeInputPortStatusDTO); - - // populate node input port status dto - final String nodeId = node.getNodeId().getId(); - nodeInputPortStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId))); - nodeInputPortStatusDTO.setPortStatus(dtoFactory.createPortStatusDto(inputPortStatus)); - } - - return clusterInputPortStatusDto; - } - - private PortStatus findNodeOutputPortStatus(final ProcessGroupStatus groupStatus, final String outputPortId) { - PortStatus portStatus = null; - - for (final PortStatus status : groupStatus.getOutputPortStatus()) { - if (outputPortId.equals(status.getId())) { - portStatus = status; - break; - } - } - - if (portStatus == null) { - for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) { - portStatus = findNodeOutputPortStatus(status, outputPortId); - - if (portStatus != null) { - break; - } - } - } - - return portStatus; - } - - @Override - public ClusterPortStatusDTO getClusterOutputPortStatus(String outputPortId) { - final ClusterPortStatusDTO clusterOutputPortStatusDto = new ClusterPortStatusDTO(); - clusterOutputPortStatusDto.setNodePortStatus(new ArrayList<NodePortStatusDTO>()); - - // set the current time - clusterOutputPortStatusDto.setStatsLastRefreshed(new Date()); - - final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED); - boolean firstNode = true; - for (final Node node : nodes) { - - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - - final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus(); - if (nodeStats == null || nodeStats.getProcessorStatus() == null) { - continue; - } - - // find the output status for this node - final PortStatus outputPortStatus = findNodeOutputPortStatus(nodeStats, outputPortId); - - // sanity check that we have status for this output port - if (outputPortStatus == null) { - throw new ResourceNotFoundException(String.format("Unable to find status for output port id '%s'.", outputPortId)); - } - - if (firstNode) { - clusterOutputPortStatusDto.setPortId(outputPortId); - clusterOutputPortStatusDto.setPortName(outputPortStatus.getName()); - firstNode = false; - } - - // create node port status dto - final NodePortStatusDTO nodeOutputPortStatusDTO = new NodePortStatusDTO(); - clusterOutputPortStatusDto.getNodePortStatus().add(nodeOutputPortStatusDTO); - - // populate node output port status dto - final String nodeId = node.getNodeId().getId(); - nodeOutputPortStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId))); - nodeOutputPortStatusDTO.setPortStatus(dtoFactory.createPortStatusDto(outputPortStatus)); - } - - return clusterOutputPortStatusDto; - } - - private RemoteProcessGroupStatus findNodeRemoteProcessGroupStatus(final ProcessGroupStatus groupStatus, final String remoteProcessGroupId) { - RemoteProcessGroupStatus remoteProcessGroupStatus = null; - - for (final RemoteProcessGroupStatus status : groupStatus.getRemoteProcessGroupStatus()) { - if (remoteProcessGroupId.equals(status.getId())) { - remoteProcessGroupStatus = status; - break; - } - } - - if (remoteProcessGroupStatus == null) { - for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) { - remoteProcessGroupStatus = findNodeRemoteProcessGroupStatus(status, remoteProcessGroupId); - - if (remoteProcessGroupStatus != null) { - break; - } - } - } - - return remoteProcessGroupStatus; - } - - @Override - public ClusterRemoteProcessGroupStatusDTO getClusterRemoteProcessGroupStatus(String remoteProcessGroupId) { - final ClusterRemoteProcessGroupStatusDTO clusterRemoteProcessGroupStatusDto = new ClusterRemoteProcessGroupStatusDTO(); - clusterRemoteProcessGroupStatusDto.setNodeRemoteProcessGroupStatus(new ArrayList<NodeRemoteProcessGroupStatusDTO>()); - - // set the current time - clusterRemoteProcessGroupStatusDto.setStatsLastRefreshed(new Date()); - - final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED); - boolean firstNode = true; - for (final Node node : nodes) { - - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - - final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus(); - if (nodeStats == null || nodeStats.getProcessorStatus() == null) { - continue; - } - - // find the remote process group for this node - final RemoteProcessGroupStatus remoteProcessGroupStatus = findNodeRemoteProcessGroupStatus(nodeStats, remoteProcessGroupId); - - // sanity check that we have status for this remote process group - if (remoteProcessGroupStatus == null) { - throw new ResourceNotFoundException(String.format("Unable to find status for remote process group id '%s'.", remoteProcessGroupId)); - } - - if (firstNode) { - clusterRemoteProcessGroupStatusDto.setRemoteProcessGroupId(remoteProcessGroupId); - clusterRemoteProcessGroupStatusDto.setRemoteProcessGroupName(remoteProcessGroupStatus.getName()); - firstNode = false; - } - - // create node remote process group status dto - final NodeRemoteProcessGroupStatusDTO nodeRemoteProcessGroupStatusDTO = new NodeRemoteProcessGroupStatusDTO(); - clusterRemoteProcessGroupStatusDto.getNodeRemoteProcessGroupStatus().add(nodeRemoteProcessGroupStatusDTO); - - // populate node remote process group status dto - final String nodeId = node.getNodeId().getId(); - nodeRemoteProcessGroupStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId))); - nodeRemoteProcessGroupStatusDTO.setRemoteProcessGroupStatus(dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroupStatus)); - } - - return clusterRemoteProcessGroupStatusDto; - } - - @Override - public ClusterStatusHistoryDTO getClusterProcessorStatusHistory(String processorId) { - return clusterManager.getProcessorStatusHistory(processorId); - } - - @Override - public ClusterStatusHistoryDTO getClusterConnectionStatusHistory(String connectionId) { - return clusterManager.getConnectionStatusHistory(connectionId); - } - - @Override - public ClusterStatusHistoryDTO getClusterProcessGroupStatusHistory(String processGroupId) { - return clusterManager.getProcessGroupStatusHistory(processGroupId); - } - - @Override - public ClusterStatusHistoryDTO getClusterRemoteProcessGroupStatusHistory(String remoteProcessGroupId) { - return clusterManager.getRemoteProcessGroupStatusHistory(remoteProcessGroupId); - } - - @Override - public NodeStatusDTO getNodeStatus(String nodeId) { - // find the node in question - final Node node = clusterManager.getNode(nodeId); - - // verify node state - if (node == null) { - throw new UnknownNodeException("Node does not exist."); - } else if (Node.Status.CONNECTED != node.getStatus()) { - throw new IllegalClusterStateException( - String.format("Node '%s:%s' is not connected to the cluster.", - node.getNodeId().getApiAddress(), node.getNodeId().getApiPort())); - } - - // get the node's last heartbeat - final NodeStatusDTO nodeStatus = new NodeStatusDTO(); - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - return nodeStatus; - } - - // get the node status - final ProcessGroupStatus nodeProcessGroupStatus = nodeHeartbeatPayload.getProcessGroupStatus(); - if (nodeProcessGroupStatus == null) { - return nodeStatus; - } - - final ProcessGroupStatusDTO nodeProcessGroupStatusDto = dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), nodeProcessGroupStatus); - nodeStatus.setControllerStatus(nodeProcessGroupStatusDto); - nodeStatus.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId))); - - return nodeStatus; - } - - @Override - public NodeSystemDiagnosticsDTO getNodeSystemDiagnostics(String nodeId) { - // find the node in question - final Node node = clusterManager.getNode(nodeId); - - // verify node state - if (node == null) { - throw new UnknownNodeException("Node does not exist."); - } else if (Node.Status.CONNECTED != node.getStatus()) { - throw new IllegalClusterStateException( - String.format("Node '%s:%s' is not connected to the cluster.", - node.getNodeId().getApiAddress(), node.getNodeId().getApiPort())); - } - - // get the node's last heartbeat - final NodeSystemDiagnosticsDTO nodeStatus = new NodeSystemDiagnosticsDTO(); - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - return nodeStatus; - } - - // get the node status - final SystemDiagnostics nodeSystemDiagnostics = nodeHeartbeatPayload.getSystemDiagnostics(); - if (nodeSystemDiagnostics == null) { - return nodeStatus; - } - - // populate the dto - nodeStatus.setControllerStatus(dtoFactory.createSystemDiagnosticsDto(nodeSystemDiagnostics)); - nodeStatus.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId))); - - return nodeStatus; - } - - @Override - public ClusterStatusDTO getClusterStatus() { - - // create cluster status dto - final ClusterStatusDTO clusterStatusDto = new ClusterStatusDTO(); - - // populate node status dtos - final Collection<NodeStatusDTO> nodeStatusDtos = new ArrayList<>(); - clusterStatusDto.setNodeStatus(nodeStatusDtos); - - for (final Node node : clusterManager.getNodes()) { - - if (Node.Status.CONNECTED != node.getStatus()) { - continue; - } - - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - - final ProcessGroupStatus nodeProcessGroupStatus = nodeHeartbeatPayload.getProcessGroupStatus(); - if (nodeProcessGroupStatus == null) { - continue; - } - - final ProcessGroupStatusDTO nodeProcessGroupStatusDto = dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), nodeProcessGroupStatus); - - // create node status dto - final NodeStatusDTO nodeStatusDto = new NodeStatusDTO(); - nodeStatusDtos.add(nodeStatusDto); - - // populate the status - nodeStatusDto.setControllerStatus(nodeProcessGroupStatusDto); - - // create and add node dto - final String nodeId = node.getNodeId().getId(); - nodeStatusDto.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId))); - - } - - return clusterStatusDto; - } - @Override public ProcessorDTO getProcessor(String id) { ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader(); @@ -3650,91 +2977,4 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return date1; } } - - /** - * Utility method for extracting component counts from the specified group status. - */ - private ProcessGroupCounts extractProcessGroupCounts(ProcessGroupStatus groupStatus) { - int running = 0; - int stopped = 0; - int invalid = 0; - int disabled = 0; - int activeRemotePorts = 0; - int inactiveRemotePorts = 0; - - for (final ProcessorStatus processorStatus : groupStatus.getProcessorStatus()) { - switch (processorStatus.getRunStatus()) { - case Disabled: - disabled++; - break; - case Running: - running++; - break; - case Invalid: - invalid++; - break; - default: - stopped++; - break; - } - } - - for (final PortStatus portStatus : groupStatus.getInputPortStatus()) { - switch (portStatus.getRunStatus()) { - case Disabled: - disabled++; - break; - case Running: - running++; - break; - case Invalid: - invalid++; - break; - default: - stopped++; - break; - } - } - - for (final PortStatus portStatus : groupStatus.getOutputPortStatus()) { - switch (portStatus.getRunStatus()) { - case Disabled: - disabled++; - break; - case Running: - running++; - break; - case Invalid: - invalid++; - break; - default: - stopped++; - break; - } - } - - for (final RemoteProcessGroupStatus remoteStatus : groupStatus.getRemoteProcessGroupStatus()) { - if (remoteStatus.getActiveRemotePortCount() != null) { - activeRemotePorts += remoteStatus.getActiveRemotePortCount(); - } - if (remoteStatus.getInactiveRemotePortCount() != null) { - inactiveRemotePorts += remoteStatus.getInactiveRemotePortCount(); - } - if (CollectionUtils.isNotEmpty(remoteStatus.getAuthorizationIssues())) { - invalid++; - } - } - - for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) { - final ProcessGroupCounts childCounts = extractProcessGroupCounts(childGroupStatus); - running += childCounts.getRunningCount(); - stopped += childCounts.getStoppedCount(); - invalid += childCounts.getInvalidCount(); - disabled += childCounts.getDisabledCount(); - activeRemotePorts += childCounts.getActiveRemotePortCount(); - inactiveRemotePorts += childCounts.getInactiveRemotePortCount(); - } - - return new ProcessGroupCounts(0, 0, running, stopped, invalid, disabled, activeRemotePorts, inactiveRemotePorts); - } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index 6f895b8..3b429e7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -79,6 +79,8 @@ public abstract class ApplicationResource { private static final int CLUSTER_CONTEXT_HEADER_VALUE_MAX_BYTES = (int) (0.75 * HEADER_BUFFER_SIZE); private static final Logger logger = LoggerFactory.getLogger(ApplicationResource.class); + public static final String NODEWISE = "false"; + @Context private HttpServletRequest httpServletRequest; http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java index 6197953..d13b5c9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java @@ -16,20 +16,19 @@ */ package org.apache.nifi.web.api; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.api.dto.BulletinBoardDTO; import org.apache.nifi.web.api.dto.BulletinQueryDTO; @@ -39,18 +38,23 @@ import org.apache.nifi.web.api.request.BulletinBoardPatternParameter; import org.apache.nifi.web.api.request.ClientIdParameter; import org.apache.nifi.web.api.request.IntegerParameter; import org.apache.nifi.web.api.request.LongParameter; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.security.access.prepost.PreAuthorize; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; + /** * RESTful endpoint for managing a Template. */ @Api(hidden = true) public class BulletinBoardResource extends ApplicationResource { - private static final Logger logger = LoggerFactory.getLogger(BulletinBoardResource.class); + private NiFiProperties properties; + private WebClusterManager clusterManager; private NiFiServiceFacade serviceFacade; @@ -128,6 +132,11 @@ public class BulletinBoardResource extends ApplicationResource { ) @QueryParam("limit") IntegerParameter limit) { + // replicate if cluster manager + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } + // build the bulletin query final BulletinQueryDTO query = new BulletinQueryDTO(); @@ -171,4 +180,11 @@ public class BulletinBoardResource extends ApplicationResource { this.serviceFacade = serviceFacade; } + public void setClusterManager(WebClusterManager clusterManager) { + this.clusterManager = clusterManager; + } + + public void setProperties(NiFiProperties properties) { + this.properties = properties; + } }
