http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index c0f4c63..3d21b55 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -30,7 +30,6 @@ import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.ListIterator; import java.util.Map; @@ -73,7 +72,6 @@ import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; import org.apache.nifi.annotation.lifecycle.OnRemoved; -import org.apache.nifi.cluster.BulletinsPayload; import org.apache.nifi.cluster.HeartbeatPayload; import org.apache.nifi.cluster.context.ClusterContext; import org.apache.nifi.cluster.context.ClusterContextImpl; @@ -109,7 +107,6 @@ import org.apache.nifi.cluster.node.Node.Status; import org.apache.nifi.cluster.protocol.ConnectionRequest; import org.apache.nifi.cluster.protocol.ConnectionResponse; import org.apache.nifi.cluster.protocol.Heartbeat; -import org.apache.nifi.cluster.protocol.NodeBulletins; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolHandler; @@ -121,7 +118,6 @@ import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; @@ -153,11 +149,13 @@ import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.controller.service.StandardControllerServiceProvider; import org.apache.nifi.controller.state.SortedStateUtils; import org.apache.nifi.controller.state.manager.StandardStateManagerProvider; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.controller.status.history.ComponentStatusRepository; +import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; import org.apache.nifi.controller.status.history.MetricDescriptor; -import org.apache.nifi.controller.status.history.StatusHistory; +import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor; +import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor; +import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor; +import org.apache.nifi.controller.status.history.StandardStatusSnapshot; import org.apache.nifi.controller.status.history.StatusHistoryUtil; import org.apache.nifi.controller.status.history.StatusSnapshot; import org.apache.nifi.diagnostics.GarbageCollection; @@ -188,7 +186,9 @@ import org.apache.nifi.remote.cluster.ClusterNodeInformation; import org.apache.nifi.remote.cluster.NodeInformation; import org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol; import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.BulletinQuery; import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingTask; @@ -202,6 +202,7 @@ import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.web.OptimisticLockingManager; import org.apache.nifi.web.Revision; import org.apache.nifi.web.UpdateRevision; +import org.apache.nifi.web.api.dto.BulletinDTO; import org.apache.nifi.web.api.dto.ComponentStateDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; @@ -209,7 +210,6 @@ import org.apache.nifi.web.api.dto.DropRequestDTO; import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.ListingRequestDTO; -import org.apache.nifi.web.api.dto.NodeDTO; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.QueueSizeDTO; @@ -223,18 +223,23 @@ import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO; -import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO; -import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO; +import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; +import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; +import org.apache.nifi.web.api.dto.status.StatusDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; +import org.apache.nifi.web.api.dto.status.StatusMerger; import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; import org.apache.nifi.web.api.entity.ComponentStateEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; import org.apache.nifi.web.api.entity.ControllerServicesEntity; +import org.apache.nifi.web.api.entity.ControllerStatusEntity; import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.FlowSnippetEntity; import org.apache.nifi.web.api.entity.ListingRequestEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.ProcessorsEntity; import org.apache.nifi.web.api.entity.ProvenanceEntity; @@ -243,6 +248,7 @@ import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity; import org.apache.nifi.web.api.entity.ReportingTaskEntity; import org.apache.nifi.web.api.entity.ReportingTasksEntity; +import org.apache.nifi.web.api.entity.StatusHistoryEntity; import org.apache.nifi.web.util.WebUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -321,6 +327,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}"); public static final Pattern PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))"); + public static final Pattern GROUP_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status"); + public static final Pattern CONTROLLER_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/status"); public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance"); public static final Pattern FLOW_SNIPPET_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/snippet-instance"); @@ -337,6 +345,15 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}"); public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}/state"); + public static final Pattern PROCESSOR_STATUS_HISTORY_URI_PATTERN = + Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/status/history"); + public static final Pattern PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status/history"); + public static final Pattern REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern + .compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}/status/history"); + public static final Pattern CONNECTION_STATUS_HISTORY_URI_PATTERN = Pattern + .compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/status/history"); + + @Deprecated public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents"); public static final Pattern DROP_REQUESTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests"); @@ -378,7 +395,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C private final BulletinRepository bulletinRepository; private final String instanceId; private final FlowEngine reportingTaskEngine; - private final Map<NodeIdentifier, ComponentStatusRepository> componentMetricsRepositoryMap = new HashMap<>(); private final StandardProcessScheduler processScheduler; private final StateManagerProvider stateManagerProvider; private final long componentStatusSnapshotMillis; @@ -463,7 +479,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor)); processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10); processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10); - processScheduler.scheduleFrameworkTask(new CaptureComponentMetrics(), "Capture Component Metrics", componentStatusSnapshotMillis, componentStatusSnapshotMillis, TimeUnit.MILLISECONDS); controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider); } @@ -620,7 +635,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return MessageType.CONNECTION_REQUEST == msg.getType() || MessageType.HEARTBEAT == msg.getType() || MessageType.CONTROLLER_STARTUP_FAILURE == msg.getType() - || MessageType.BULLETINS == msg.getType() || MessageType.RECONNECTION_FAILURE == msg.getType(); } @@ -654,10 +668,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } }, "Handle Reconnection Failure Message from " + ((ReconnectionFailureMessage) protocolMessage).getNodeId()).start(); return null; - case BULLETINS: - final NodeBulletinsMessage bulletinsMessage = (NodeBulletinsMessage) protocolMessage; - handleBulletins(bulletinsMessage.getBulletins()); - return null; default: throw new ProtocolException("No handler defined for message type: " + protocolMessage.getType()); } @@ -1686,22 +1696,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C processScheduler.enableReportingTask(reportingTask); } - /** - * Handle a bulletins message. - * - * @param bulletins bulletins - */ - public void handleBulletins(final NodeBulletins bulletins) { - final NodeIdentifier nodeIdentifier = bulletins.getNodeIdentifier(); - final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort(); - - // unmarshal the message - final BulletinsPayload payload = BulletinsPayload.unmarshal(bulletins.getPayload()); - for (final Bulletin bulletin : payload.getBulletins()) { - bulletin.setNodeAddress(nodeAddress); - bulletinRepository.addBulletin(bulletin); - } - } /** * Handles a node's heartbeat. If this heartbeat is a node's first heartbeat since its connection request, then the manager will mark the node as connected. If the node was previously disconnected @@ -2442,6 +2436,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return ("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches(); } + private static boolean isGroupStatusEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && GROUP_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isControllerStatusEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && CONTROLLER_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + private static boolean isTemplateEndpoint(final URI uri, final String method) { return "POST".equalsIgnoreCase(method) && TEMPLATE_INSTANCE_URI_PATTERN.matcher(uri.getPath()).matches(); } @@ -2454,6 +2456,23 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches(); } + private static boolean isProcessorStatusHistoryEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && PROCESSOR_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isProcessGroupStatusHistoryEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isRemoteProcessGroupStatusHistoryEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isConnectionStatusHistoryEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && CONNECTION_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isRemoteProcessGroupEndpoint(final URI uri, final String method) { if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REMOTE_PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches()) { return true; @@ -2556,7 +2575,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C || isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) || isControllerServiceReferenceEndpoint(uri, method) || isControllerServiceStateEndpoint(uri, method) || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method) || isReportingTaskStateEndpoint(uri, method) - || isDropRequestEndpoint(uri, method) || isListFlowFilesEndpoint(uri, method); + || isDropRequestEndpoint(uri, method) || isListFlowFilesEndpoint(uri, method) + || isGroupStatusEndpoint(uri, method) || isControllerStatusEndpoint(uri, method) + || isProcessorStatusHistoryEndpoint(uri, method) || isProcessGroupStatusHistoryEndpoint(uri, method) + || isRemoteProcessGroupStatusHistoryEndpoint(uri, method) || isConnectionStatusHistoryEndpoint(uri, method); } private void mergeProcessorValidationErrors(final ProcessorDTO processor, Map<NodeIdentifier, ProcessorDTO> processorMap) { @@ -2608,6 +2630,159 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C componentState.getLocalState().setState(localStateEntries); } + @SuppressWarnings("unchecked") + private void updateBulletins(final String nodeAddress, final Collection<? extends StatusDTO>... dtos) { + for (final Collection<? extends StatusDTO> collection : dtos) { + if (collection != null) { + for (final StatusDTO dto : collection) { + final List<BulletinDTO> bulletins = dto.getBulletins(); + if (bulletins != null) { + for (final BulletinDTO bulletin : bulletins) { + bulletin.setNodeAddress(nodeAddress); + } + } + } + } + } + } + + @SuppressWarnings("unchecked") + private void updateBulletins(final ProcessGroupStatusDTO dto, final String nodeAddress) { + for (final BulletinDTO bulletin : dto.getBulletins()) { + bulletin.setNodeAddress(nodeAddress); + } + + updateBulletins(nodeAddress, dto.getProcessorStatus(), dto.getInputPortStatus(), dto.getOutputPortStatus(), dto.getRemoteProcessGroupStatus()); + + if (dto.getProcessGroupStatus() != null) { + for (final ProcessGroupStatusDTO childGroup : dto.getProcessGroupStatus()) { + updateBulletins(childGroup, nodeAddress); + } + } + } + + private void mergeGroupStatus(final ProcessGroupStatusDTO statusDto, final Map<NodeIdentifier, ProcessGroupStatusDTO> resultMap) { + ProcessGroupStatusDTO mergedProcessGroupStatus = statusDto; + for (final Map.Entry<NodeIdentifier, ProcessGroupStatusDTO> entry : resultMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ProcessGroupStatusDTO nodeProcessGroupStatus = entry.getValue(); + + final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort(); + updateBulletins(mergedProcessGroupStatus, nodeAddress); + + if (nodeProcessGroupStatus == mergedProcessGroupStatus) { + continue; + } + + final ProcessGroupStatusDTO nodeClone = nodeProcessGroupStatus.clone(); + for (final RemoteProcessGroupStatusDTO remoteProcessGroupStatus : nodeClone.getRemoteProcessGroupStatus()) { + final List<String> nodeAuthorizationIssues = remoteProcessGroupStatus.getAuthorizationIssues(); + if (!nodeAuthorizationIssues.isEmpty()) { + for (final ListIterator<String> iter = nodeAuthorizationIssues.listIterator(); iter.hasNext();) { + final String Issue = iter.next(); + iter.set("[" + nodeId.getApiAddress() + ":" + nodeId.getApiPort() + "] -- " + Issue); + } + remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues); + } + } + + StatusMerger.merge(mergedProcessGroupStatus, nodeClone); + } + + StatusMerger.updatePrettyPrintedFields(mergedProcessGroupStatus); + } + + + private void mergeControllerStatus(final ControllerStatusDTO statusDto, final Map<NodeIdentifier, ControllerStatusDTO> resultMap) { + ControllerStatusDTO mergedStatus = statusDto; + for (final Map.Entry<NodeIdentifier, ControllerStatusDTO> entry : resultMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ControllerStatusDTO nodeStatus = entry.getValue(); + + final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort(); + for (final BulletinDTO bulletin : nodeStatus.getBulletins()) { + bulletin.setNodeAddress(nodeAddress); + } + for (final BulletinDTO bulletin : nodeStatus.getControllerServiceBulletins()) { + bulletin.setNodeAddress(nodeAddress); + } + for (final BulletinDTO bulletin : nodeStatus.getReportingTaskBulletins()) { + bulletin.setNodeAddress(nodeAddress); + } + + if (nodeStatus == mergedStatus) { + continue; + } + + StatusMerger.merge(mergedStatus, nodeStatus); + } + + final int totalNodeCount = getNodeIds().size(); + final int connectedNodeCount = getNodeIds(Status.CONNECTED).size(); + + final List<Bulletin> ncmControllerBulletins = getBulletinRepository().findBulletinsForController(); + mergedStatus.setBulletins(mergeNCMBulletins(mergedStatus.getBulletins(), ncmControllerBulletins)); + + // get the controller service bulletins + final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build(); + final List<Bulletin> ncmServiceBulletins = getBulletinRepository().findBulletins(controllerServiceQuery); + mergedStatus.setControllerServiceBulletins(mergeNCMBulletins(mergedStatus.getControllerServiceBulletins(), ncmServiceBulletins)); + + // get the reporting task bulletins + final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build(); + final List<Bulletin> ncmReportingTaskBulletins = getBulletinRepository().findBulletins(reportingTaskQuery); + mergedStatus.setReportingTaskBulletins(mergeNCMBulletins(mergedStatus.getReportingTaskBulletins(), ncmReportingTaskBulletins)); + + mergedStatus.setConnectedNodeCount(connectedNodeCount); + mergedStatus.setTotalNodeCount(totalNodeCount); + StatusMerger.updatePrettyPrintedFields(mergedStatus); + } + + private List<BulletinDTO> mergeNCMBulletins(final List<BulletinDTO> nodeBulletins, final List<Bulletin> ncmBulletins) { + if (ncmBulletins == null || ncmBulletins.isEmpty()) { + return nodeBulletins; + } + + final List<BulletinDTO> mergedBulletins = new ArrayList<>(nodeBulletins.size() + ncmBulletins.size()); + mergedBulletins.addAll(nodeBulletins); + mergedBulletins.addAll(createBulletinDtos(ncmBulletins)); + return mergedBulletins; + } + + /** + * Creates BulletinDTOs for the specified Bulletins. + * + * @param bulletins bulletin + * @return dto + */ + public List<BulletinDTO> createBulletinDtos(final List<Bulletin> bulletins) { + final List<BulletinDTO> bulletinDtos = new ArrayList<>(bulletins.size()); + for (final Bulletin bulletin : bulletins) { + bulletinDtos.add(createBulletinDto(bulletin)); + } + return bulletinDtos; + } + + /** + * Creates a BulletinDTO for the specified Bulletin. + * + * @param bulletin bulletin + * @return dto + */ + public BulletinDTO createBulletinDto(final Bulletin bulletin) { + final BulletinDTO dto = new BulletinDTO(); + dto.setId(bulletin.getId()); + dto.setNodeAddress(bulletin.getNodeAddress()); + dto.setTimestamp(bulletin.getTimestamp()); + dto.setGroupId(bulletin.getGroupId()); + dto.setSourceId(bulletin.getSourceId()); + dto.setSourceName(bulletin.getSourceName()); + dto.setCategory(bulletin.getCategory()); + dto.setLevel(bulletin.getLevel()); + dto.setMessage(bulletin.getMessage()); + return dto; + } + private void mergeProvenanceQueryResults(final ProvenanceDTO provenanceDto, final Map<NodeIdentifier, ProvenanceDTO> resultMap, final Set<NodeResponse> problematicResponses) { final ProvenanceResultsDTO results = provenanceDto.getResults(); final ProvenanceRequestDTO request = provenanceDto.getRequest(); @@ -3545,6 +3720,70 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C mergeComponentState(componentState, resultsMap); clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isGroupStatusEndpoint(uri, method)) { + final ProcessGroupStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessGroupStatusEntity.class); + final ProcessGroupStatusDTO statusRequest = responseEntity.getProcessGroupStatus(); + + final Map<NodeIdentifier, ProcessGroupStatusDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final ProcessGroupStatusEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessGroupStatusEntity.class); + final ProcessGroupStatusDTO nodeStatus = nodeResponseEntity.getProcessGroupStatus(); + + resultsMap.put(nodeResponse.getNodeId(), nodeStatus); + } + mergeGroupStatus(statusRequest, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isControllerStatusEndpoint(uri, method)) { + final ControllerStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerStatusEntity.class); + final ControllerStatusDTO statusRequest = responseEntity.getControllerStatus(); + + final Map<NodeIdentifier, ControllerStatusDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final ControllerStatusEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerStatusEntity.class); + final ControllerStatusDTO nodeStatus = nodeResponseEntity.getControllerStatus(); + + resultsMap.put(nodeResponse.getNodeId(), nodeStatus); + } + mergeControllerStatus(statusRequest, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isProcessorStatusHistoryEndpoint(uri, method)) { + final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>(); + for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) { + metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor()); + } + + clientResponse = mergeStatusHistoryResponses(clientResponse, updatedNodesMap, problematicNodeResponses, metricDescriptors); + } else if (hasSuccessfulClientResponse && isConnectionStatusHistoryEndpoint(uri, method)) { + final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>(); + for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) { + metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor()); + } + + clientResponse = mergeStatusHistoryResponses(clientResponse, updatedNodesMap, problematicNodeResponses, metricDescriptors); + } else if (hasSuccessfulClientResponse && isProcessGroupStatusHistoryEndpoint(uri, method)) { + final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>(); + for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) { + metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor()); + } + + clientResponse = mergeStatusHistoryResponses(clientResponse, updatedNodesMap, problematicNodeResponses, metricDescriptors); + } else if (hasSuccessfulClientResponse && isRemoteProcessGroupStatusHistoryEndpoint(uri, method)) { + final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>(); + for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) { + metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor()); + } + + clientResponse = mergeStatusHistoryResponses(clientResponse, updatedNodesMap, problematicNodeResponses, metricDescriptors); } else { if (!nodeResponsesToDrain.isEmpty()) { drainResponses(nodeResponsesToDrain); @@ -3603,6 +3842,44 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return clientResponse; } + + private NodeResponse mergeStatusHistoryResponses(NodeResponse clientResponse, Map<Node, NodeResponse> updatedNodesMap, Set<NodeResponse> problematicNodeResponses, + Map<String, MetricDescriptor<?>> metricDescriptors) { + final StatusHistoryEntity responseEntity = clientResponse.getClientResponse().getEntity(StatusHistoryEntity.class); + + StatusHistoryDTO lastStatusHistory = null; + final Map<String, List<StatusSnapshotDTO>> nodeStatusHistories = new HashMap<>(updatedNodesMap.size()); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final StatusHistoryEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(StatusHistoryEntity.class); + final StatusHistoryDTO nodeStatus = nodeResponseEntity.getStatusHistory(); + lastStatusHistory = nodeStatus; + + final NodeIdentifier nodeId = nodeResponse.getNodeId(); + final String nodeName = nodeId.getApiAddress() + ":" + nodeId.getApiPort(); + nodeStatusHistories.put(nodeName, nodeStatus.getAggregateStatusSnapshots()); + } + + final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO(); + clusterStatusHistory.setAggregateStatusSnapshots(mergeStatusHistories(nodeStatusHistories, metricDescriptors)); + clusterStatusHistory.setGenerated(new Date()); + clusterStatusHistory.setNodeStatusSnapshots(nodeStatusHistories); + if (lastStatusHistory != null) { + clusterStatusHistory.setComponentDetails(lastStatusHistory.getComponentDetails()); + clusterStatusHistory.setFieldDescriptors(lastStatusHistory.getFieldDescriptors()); + } + + final StatusHistoryEntity clusterEntity = new StatusHistoryEntity(); + clusterEntity.setStatusHistory(clusterStatusHistory); + clusterEntity.setRevision(responseEntity.getRevision()); + + return new NodeResponse(clientResponse, clusterEntity); + } + + /** * Determines if all problematic responses were due to 404 NOT_FOUND. Assumes that problematicNodeResponses is not empty and is not comprised of responses from all nodes in the cluster (at least * one node contained the counter in question). @@ -4026,78 +4303,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return bulletinRepository; } - @Override - public ProcessGroupStatus getProcessGroupStatus(final String groupId) { - final Set<Node> connectedNodes = getNodes(Node.Status.CONNECTED); - - // ensure there are some nodes in the cluster - if (connectedNodes.isEmpty()) { - throw new NoConnectedNodesException(); - } - - ProcessGroupStatus mergedProcessGroupStatus = null; - for (final Node node : connectedNodes) { - final NodeIdentifier nodeId = node.getNodeId(); - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - final ProcessGroupStatus nodeRootProcessGroupStatus = nodeHeartbeatPayload.getProcessGroupStatus(); - final ProcessGroupStatus nodeProcessGroupStatus = groupId.equals(ROOT_GROUP_ID_ALIAS) ? nodeRootProcessGroupStatus : getProcessGroupStatus(nodeRootProcessGroupStatus, groupId); - if (nodeProcessGroupStatus == null) { - continue; - } - - if (mergedProcessGroupStatus == null) { - mergedProcessGroupStatus = nodeProcessGroupStatus.clone(); - - // update any issues with the node label - if (mergedProcessGroupStatus.getRemoteProcessGroupStatus() != null) { - for (final RemoteProcessGroupStatus remoteProcessGroupStatus : mergedProcessGroupStatus.getRemoteProcessGroupStatus()) { - final List<String> nodeAuthorizationIssues = remoteProcessGroupStatus.getAuthorizationIssues(); - if (!nodeAuthorizationIssues.isEmpty()) { - for (final ListIterator<String> iter = nodeAuthorizationIssues.listIterator(); iter.hasNext();) { - final String Issue = iter.next(); - iter.set("[" + nodeId.getApiAddress() + ":" + nodeId.getApiPort() + "] -- " + Issue); - } - remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues); - } - } - } - } else { - final ProcessGroupStatus nodeClone = nodeProcessGroupStatus.clone(); - for (final RemoteProcessGroupStatus remoteProcessGroupStatus : nodeClone.getRemoteProcessGroupStatus()) { - final List<String> nodeAuthorizationIssues = remoteProcessGroupStatus.getAuthorizationIssues(); - if (!nodeAuthorizationIssues.isEmpty()) { - for (final ListIterator<String> iter = nodeAuthorizationIssues.listIterator(); iter.hasNext();) { - final String Issue = iter.next(); - iter.set("[" + nodeId.getApiAddress() + ":" + nodeId.getApiPort() + "] -- " + Issue); - } - remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues); - } - } - - ProcessGroupStatus.merge(mergedProcessGroupStatus, nodeClone); - } - } - - return mergedProcessGroupStatus; - } - - private ProcessGroupStatus getProcessGroupStatus(final ProcessGroupStatus parent, final String groupId) { - if (parent.getId().equals(groupId)) { - return parent; - } - - for (final ProcessGroupStatus child : parent.getProcessGroupStatus()) { - final ProcessGroupStatus matching = getProcessGroupStatus(child, groupId); - if (matching != null) { - return matching; - } - } - - return null; - } @Override public SystemDiagnostics getSystemDiagnostics() { @@ -4214,19 +4419,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return new Date(time - time % numMillis); } - private NodeDTO createNodeDTO(final Node node) { - final NodeDTO nodeDto = new NodeDTO(); - final NodeIdentifier nodeId = node.getNodeId(); - nodeDto.setNodeId(nodeId.getId()); - nodeDto.setAddress(nodeId.getApiAddress()); - nodeDto.setApiPort(nodeId.getApiPort()); - nodeDto.setStatus(node.getStatus().name()); - nodeDto.setPrimary(node.equals(getPrimaryNode())); - final Date connectionRequested = new Date(node.getConnectionRequestedTimestamp()); - nodeDto.setConnectionRequested(connectionRequested); - - return nodeDto; - } private List<StatusSnapshotDTO> aggregate(Map<Date, List<StatusSnapshot>> snapshotsToAggregate) { // Aggregate the snapshots @@ -4245,185 +4437,34 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return aggregatedSnapshotDtos; } - public ClusterStatusHistoryDTO getProcessorStatusHistory(final String processorId) { - return getProcessorStatusHistory(processorId, null, null, Integer.MAX_VALUE); - } - - public ClusterStatusHistoryDTO getProcessorStatusHistory(final String processorId, final Date startDate, final Date endDate, final int preferredDataPoints) { - final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>(); - - StatusHistoryDTO lastStatusHistory = null; - final Set<MetricDescriptor<?>> processorDescriptors = new LinkedHashSet<>(); - final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); - for (final Node node : getRawNodes()) { - final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId()); - if (statusRepository == null) { - continue; - } - - final StatusHistory statusHistory = statusRepository.getProcessorStatusHistory(processorId, startDate, endDate, preferredDataPoints); - if (statusHistory == null) { - continue; - } + private StatusSnapshot createSnapshot(final StatusSnapshotDTO snapshotDto, final Map<String, MetricDescriptor<?>> metricDescriptors) { + final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(); + snapshot.setTimestamp(snapshotDto.getTimestamp()); - processorDescriptors.addAll(statusRepository.getProcessorMetricDescriptors()); + final Map<String, Long> metrics = snapshotDto.getStatusMetrics(); + for (final Map.Entry<String, Long> entry : metrics.entrySet()) { + final String metricId = entry.getKey(); + final Long value = entry.getValue(); - // record the status history (last) to get the component details for use later - final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory); - lastStatusHistory = statusHistoryDto; - - final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO(); - nodeHistory.setStatusHistory(statusHistoryDto); - nodeHistory.setNode(createNodeDTO(node)); - nodeHistories.add(nodeHistory); - - // collect all of the snapshots to aggregate - for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) { - final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis); - List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate); - if (snapshots == null) { - snapshots = new ArrayList<>(); - snapshotsToAggregate.put(normalizedDate, snapshots); - } - snapshots.add(snapshot); - } - } - - // Aggregate the snapshots - final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate); - - // get the details for this component from the last status history - final LinkedHashMap<String, String> clusterStatusHistoryDetails = new LinkedHashMap<>(); - clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails()); - - final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO(); - clusterStatusHistory.setGenerated(new Date()); - clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(processorDescriptors)); - clusterStatusHistory.setDetails(clusterStatusHistoryDetails); - clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos); - - final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO(); - history.setGenerated(new Date()); - history.setNodeStatusHistory(nodeHistories); - history.setClusterStatusHistory(clusterStatusHistory); - return history; - } - - public StatusHistoryDTO createStatusHistoryDto(final StatusHistory statusHistory) { - final StatusHistoryDTO dto = new StatusHistoryDTO(); - - dto.setDetails(new LinkedHashMap<>(statusHistory.getComponentDetails())); - dto.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(statusHistory)); - dto.setGenerated(statusHistory.getDateGenerated()); - - final List<StatusSnapshotDTO> statusSnapshots = new ArrayList<>(); - for (final StatusSnapshot statusSnapshot : statusHistory.getStatusSnapshots()) { - statusSnapshots.add(StatusHistoryUtil.createStatusSnapshotDto(statusSnapshot)); - } - dto.setStatusSnapshots(statusSnapshots); - - return dto; - } - - public ClusterStatusHistoryDTO getConnectionStatusHistory(final String connectionId) { - return getConnectionStatusHistory(connectionId, null, null, Integer.MAX_VALUE); - } - - public ClusterStatusHistoryDTO getConnectionStatusHistory(final String connectionId, final Date startDate, final Date endDate, final int preferredDataPoints) { - final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>(); - - StatusHistoryDTO lastStatusHistory = null; - final Set<MetricDescriptor<?>> connectionDescriptors = new LinkedHashSet<>(); - final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); - - for (final Node node : getRawNodes()) { - final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId()); - if (statusRepository == null) { - continue; - } - - final StatusHistory statusHistory = statusRepository.getConnectionStatusHistory(connectionId, startDate, endDate, preferredDataPoints); - if (statusHistory == null) { - continue; - } - - final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory); - // record the status history (last) to get the componet details for use later - lastStatusHistory = statusHistoryDto; - connectionDescriptors.addAll(statusRepository.getConnectionMetricDescriptors()); - - final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO(); - nodeHistory.setStatusHistory(statusHistoryDto); - nodeHistory.setNode(createNodeDTO(node)); - nodeHistories.add(nodeHistory); - - // collect all of the snapshots to aggregate - for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) { - final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis); - List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate); - if (snapshots == null) { - snapshots = new ArrayList<>(); - snapshotsToAggregate.put(normalizedDate, snapshots); - } - snapshots.add(snapshot); + final MetricDescriptor<?> descriptor = metricDescriptors.get(metricId); + if (descriptor != null) { + snapshot.addStatusMetric(descriptor, value); } } - // Aggregate the snapshots - final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate); - - // get the details for this component from the last status history - final LinkedHashMap<String, String> clusterStatusHistoryDetails = new LinkedHashMap<>(); - clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails()); - - final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO(); - clusterStatusHistory.setGenerated(new Date()); - clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(connectionDescriptors)); - clusterStatusHistory.setDetails(clusterStatusHistoryDetails); - clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos); - - final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO(); - history.setGenerated(new Date()); - history.setNodeStatusHistory(nodeHistories); - history.setClusterStatusHistory(clusterStatusHistory); - return history; - } - - public ClusterStatusHistoryDTO getProcessGroupStatusHistory(final String processGroupId) { - return getProcessGroupStatusHistory(processGroupId, null, null, Integer.MAX_VALUE); + return snapshot; } - public ClusterStatusHistoryDTO getProcessGroupStatusHistory(final String processGroupId, final Date startDate, final Date endDate, final int preferredDataPoints) { - final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>(); - - StatusHistoryDTO lastStatusHistory = null; - final Set<MetricDescriptor<?>> processGroupDescriptors = new LinkedHashSet<>(); + private List<StatusSnapshotDTO> mergeStatusHistories(final Map<String, List<StatusSnapshotDTO>> nodeStatusHistories, final Map<String, MetricDescriptor<?>> metricDescriptors) { + // Map of "normalized Date" (i.e., a time range, essentially) to all Snapshots for that time. The list + // will contain one snapshot for each node. final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); - for (final Node node : getRawNodes()) { - final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId()); - if (statusRepository == null) { - continue; - } - - final StatusHistory statusHistory = statusRepository.getProcessGroupStatusHistory(processGroupId, startDate, endDate, preferredDataPoints); - if (statusHistory == null) { - continue; - } - - final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory); - // record the status history (last) to get the componet details for use later - lastStatusHistory = statusHistoryDto; - processGroupDescriptors.addAll(statusRepository.getProcessGroupMetricDescriptors()); - - final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO(); - nodeHistory.setStatusHistory(statusHistoryDto); - nodeHistory.setNode(createNodeDTO(node)); - nodeHistories.add(nodeHistory); - - // collect all of the snapshots to aggregate - for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) { + // group status snapshot's for each node by date + for (final List<StatusSnapshotDTO> snapshotDtos : nodeStatusHistories.values()) { + for (final StatusSnapshotDTO snapshotDto : snapshotDtos) { + final StatusSnapshot snapshot = createSnapshot(snapshotDto, metricDescriptors); final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis); List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate); if (snapshots == null) { @@ -4434,89 +4475,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } } - // Aggregate the snapshots - final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate); - - // get the details for this component from the last status history - final LinkedHashMap<String, String> clusterStatusHistoryDetails = new LinkedHashMap<>(); - clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails()); - - final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO(); - clusterStatusHistory.setGenerated(new Date()); - clusterStatusHistory.setDetails(clusterStatusHistoryDetails); - clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(processGroupDescriptors)); - clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos); - - final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO(); - history.setGenerated(new Date()); - history.setNodeStatusHistory(nodeHistories); - history.setClusterStatusHistory(clusterStatusHistory); - return history; + // aggregate the snapshots by (normalized) timestamp + final List<StatusSnapshotDTO> aggregatedSnapshots = aggregate(snapshotsToAggregate); + return aggregatedSnapshots; } - public ClusterStatusHistoryDTO getRemoteProcessGroupStatusHistory(final String remoteGroupId) { - return getRemoteProcessGroupStatusHistory(remoteGroupId, null, null, Integer.MAX_VALUE); - } - - public ClusterStatusHistoryDTO getRemoteProcessGroupStatusHistory(final String remoteGroupId, final Date startDate, final Date endDate, final int preferredDataPoints) { - final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>(); - - StatusHistoryDTO lastStatusHistory = null; - final Set<MetricDescriptor<?>> remoteProcessGroupDescriptors = new LinkedHashSet<>(); - final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); - - for (final Node node : getRawNodes()) { - final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId()); - if (statusRepository == null) { - continue; - } - - final StatusHistory statusHistory = statusRepository.getRemoteProcessGroupStatusHistory(remoteGroupId, startDate, endDate, preferredDataPoints); - if (statusHistory == null) { - continue; - } - - final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory); - // record the status history (last) to get the componet details for use later - lastStatusHistory = statusHistoryDto; - remoteProcessGroupDescriptors.addAll(statusRepository.getRemoteProcessGroupMetricDescriptors()); - final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO(); - nodeHistory.setStatusHistory(statusHistoryDto); - nodeHistory.setNode(createNodeDTO(node)); - nodeHistories.add(nodeHistory); - - // collect all of the snapshots to aggregate - for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) { - final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis); - List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate); - if (snapshots == null) { - snapshots = new ArrayList<>(); - snapshotsToAggregate.put(normalizedDate, snapshots); - } - snapshots.add(snapshot); - } - } - - // Aggregate the snapshots - final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate); - - // get the details for this component from the last status history - final LinkedHashMap<String, String> clusterStatusHistoryDetails = new LinkedHashMap<>(); - clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails()); - - final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO(); - clusterStatusHistory.setGenerated(new Date()); - clusterStatusHistory.setDetails(clusterStatusHistoryDetails); - clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(remoteProcessGroupDescriptors)); - clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos); - - final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO(); - history.setGenerated(new Date()); - history.setNodeStatusHistory(nodeHistories); - history.setClusterStatusHistory(clusterStatusHistory); - return history; - } private static class ClusterManagerLock { @@ -4583,41 +4547,4 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) { return controllerServiceProvider.getControllerServiceIdentifiers(serviceType); } - - /** - * Captures snapshots of components' metrics - */ - private class CaptureComponentMetrics implements Runnable { - @Override - public void run() { - readLock.lock(); - try { - for (final Node node : nodes) { - if (Status.CONNECTED.equals(node.getStatus())) { - ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId()); - if (statusRepository == null) { - statusRepository = createComponentStatusRepository(); - componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository); - } - - // ensure this node has a payload - if (node.getHeartbeat() != null && node.getHeartbeatPayload() != null) { - // if nothing has been captured or the current heartbeat is newer, capture it - comparing the heatbeat created timestamp - // is safe since its marked as XmlTransient so we're assured that its based off the same clock that created the last capture date - if (statusRepository.getLastCaptureDate() == null || node.getHeartbeat().getCreatedTimestamp() > statusRepository.getLastCaptureDate().getTime()) { - statusRepository.capture(node.getHeartbeatPayload().getProcessGroupStatus()); - } - } - } - } - } catch (final Throwable t) { - logger.warn("Unable to capture component metrics from Node heartbeats: " + t); - if (logger.isDebugEnabled()) { - logger.warn("", t); - } - } finally { - readLock.unlock("capture component metrics from node heartbeats"); - } - } - } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 6d09bf6..7ea0408 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -16,7 +16,40 @@ */ package org.apache.nifi.controller; -import com.sun.jersey.api.client.ClientHandlerException; +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.net.ssl.SSLContext; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; import org.apache.nifi.admin.service.AuditService; @@ -27,16 +60,13 @@ import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; import org.apache.nifi.annotation.notification.PrimaryNodeState; -import org.apache.nifi.cluster.BulletinsPayload; import org.apache.nifi.cluster.HeartbeatPayload; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.cluster.protocol.Heartbeat; -import org.apache.nifi.cluster.protocol.NodeBulletins; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.NodeProtocolSender; import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.connectable.Connectable; @@ -109,7 +139,6 @@ import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.events.EventReporter; -import org.apache.nifi.events.NodeBulletinProcessingStrategy; import org.apache.nifi.events.VolatileBulletinRepository; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -182,39 +211,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.LockSupport; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static java.util.Objects.requireNonNull; +import com.sun.jersey.api.client.ClientHandlerException; public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, Heartbeater, QueueProvider { @@ -306,7 +303,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * timer to periodically send heartbeats to the cluster */ - private ScheduledFuture<?> bulletinFuture; private ScheduledFuture<?> heartbeatGeneratorFuture; private ScheduledFuture<?> heartbeatSenderFuture; @@ -316,8 +312,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R */ private final AtomicReference<HeartbeatMessageGeneratorTask> heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null); - private final AtomicReference<NodeBulletinProcessingStrategy> nodeBulletinSubscriber; - // guarded by rwLock /** * the node identifier; @@ -418,7 +412,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R counterRepositoryRef = new AtomicReference<CounterRepository>(new StandardCounterRepository()); bulletinRepository = new VolatileBulletinRepository(); - nodeBulletinSubscriber = new AtomicReference<>(); try { this.provenanceEventRepository = createProvenanceRepository(properties); @@ -2953,8 +2946,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R stopHeartbeating(); - bulletinFuture = clusterTaskExecutor.scheduleWithFixedDelay(new BulletinsTask(protocolSender), 250, 2000, TimeUnit.MILLISECONDS); - final HeartbeatMessageGeneratorTask heartbeatMessageGeneratorTask = new HeartbeatMessageGeneratorTask(); heartbeatMessageGeneratorTaskRef.set(heartbeatMessageGeneratorTask); heartbeatGeneratorFuture = clusterTaskExecutor.scheduleWithFixedDelay(heartbeatMessageGeneratorTask, 0, heartbeatDelaySeconds, TimeUnit.SECONDS); @@ -3005,10 +2996,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R if (heartbeatSenderFuture != null) { heartbeatSenderFuture.cancel(false); } - - if (bulletinFuture != null) { - bulletinFuture.cancel(false); - } } finally { writeLock.unlock(); } @@ -3133,8 +3120,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // update the bulletin repository if (isChanging) { if (clustered) { - nodeBulletinSubscriber.set(new NodeBulletinProcessingStrategy()); - bulletinRepository.overrideDefaultBulletinProcessing(nodeBulletinSubscriber.get()); stateManagerProvider.enableClusterProvider(); if (zooKeeperStateServer != null) { @@ -3173,7 +3158,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L)); } } else { - bulletinRepository.restoreDefaultBulletinProcessing(); if (zooKeeperStateServer != null) { zooKeeperStateServer.shutdown(); } @@ -3472,6 +3456,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return replayFlowFile(record, requestor); } + @SuppressWarnings("deprecation") public ProvenanceEventRecord replayFlowFile(final ProvenanceEventRecord event, final String requestor) throws IOException { if (event == null) { throw new NullPointerException(); @@ -3627,110 +3612,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } - private class BulletinsTask implements Runnable { - - private final NodeProtocolSender protocolSender; - private final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS", Locale.US); - - public BulletinsTask(final NodeProtocolSender protocolSender) { - if (protocolSender == null) { - throw new IllegalArgumentException("NodeProtocolSender may not be null."); - } - this.protocolSender = protocolSender; - } - - @Override - public void run() { - try { - final NodeBulletinsMessage message = createBulletinsMessage(); - if (message == null) { - return; - } - - protocolSender.sendBulletins(message); - if (LOG.isDebugEnabled()) { - LOG.debug( - String.format( - "Sending bulletins to cluster manager at %s", - dateFormatter.format(new Date()))); - } - - } catch (final UnknownServiceAddressException usae) { - if (LOG.isDebugEnabled()) { - LOG.debug(usae.getMessage()); - } - } catch (final Exception ex) { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to send bulletins to cluster manager due to: " + ex, ex); - } - } - } - - private boolean isIllegalXmlChar(final char c) { - return c < 0x20 && c != 0x09 && c != 0x0A && c != 0x0D; - } - - private boolean containsIllegalXmlChars(final Bulletin bulletin) { - final String message = bulletin.getMessage(); - for (int i = 0; i < message.length(); i++) { - final char c = message.charAt(i); - if (isIllegalXmlChar(c)) { - return true; - } - } - - return false; - } - - private String stripIllegalXmlChars(final String value) { - final StringBuilder sb = new StringBuilder(value.length()); - for (int i = 0; i < value.length(); i++) { - final char c = value.charAt(i); - sb.append(isIllegalXmlChar(c) ? '?' : c); - } - - return sb.toString(); - } - - private NodeBulletinsMessage createBulletinsMessage() { - final Set<Bulletin> nodeBulletins = nodeBulletinSubscriber.get().getBulletins(); - final Set<Bulletin> escapedNodeBulletins = new HashSet<>(nodeBulletins.size()); - - // ensure there are some bulletins to report - if (nodeBulletins.isEmpty()) { - return null; - } - - for (final Bulletin bulletin : nodeBulletins) { - final Bulletin escapedBulletin; - if (containsIllegalXmlChars(bulletin)) { - final String escapedBulletinMessage = stripIllegalXmlChars(bulletin.getMessage()); - - if (bulletin.getGroupId() == null) { - escapedBulletin = BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage); - } else { - escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), bulletin.getSourceType(), - bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage); - } - } else { - escapedBulletin = bulletin; - } - - escapedNodeBulletins.add(escapedBulletin); - } - - // create the bulletin payload - final BulletinsPayload payload = new BulletinsPayload(); - payload.setBulletins(escapedNodeBulletins); - - // create bulletin message - final NodeBulletins bulletins = new NodeBulletins(getNodeId(), payload.marshal()); - final NodeBulletinsMessage message = new NodeBulletinsMessage(); - message.setBulletins(bulletins); - - return message; - } - } private class HeartbeatSendTask implements Runnable { @@ -3819,7 +3700,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R hbPayload.setCounters(getCounters()); hbPayload.setSystemDiagnostics(getSystemDiagnostics()); - hbPayload.setProcessGroupStatus(procGroupStatus); // create heartbeat message final Heartbeat heartbeat = new Heartbeat(getNodeId(), bean.isPrimary(), bean.isConnected(), hbPayload.marshal()); http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java new file mode 100644 index 0000000..8b9f383 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.status.history; + +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; + +public enum ConnectionStatusDescriptor { + INPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>( + "inputBytes", + "Bytes In (5 mins)", + "The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes", + Formatter.DATA_SIZE, + new ValueMapper<ConnectionStatus>() { + @Override + public Long getValue(final ConnectionStatus status) { + return status.getInputBytes(); + } + })), + + INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>( + "inputCount", + "FlowFiles In (5 mins)", + "The number of FlowFiles that were transferred to this Connection in the past 5 minutes", + Formatter.COUNT, + new ValueMapper<ConnectionStatus>() { + @Override + public Long getValue(final ConnectionStatus status) { + return Long.valueOf(status.getInputCount()); + } + })), + + OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>( + "outputBytes", + "Bytes Out (5 mins)", + "The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes", + Formatter.DATA_SIZE, + new ValueMapper<ConnectionStatus>() { + @Override + public Long getValue(final ConnectionStatus status) { + return status.getOutputBytes(); + } + })), + + OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>( + "outputCount", + "FlowFiles Out (5 mins)", + "The number of FlowFiles that were pulled from this Connection in the past 5 minutes", + Formatter.COUNT, + new ValueMapper<ConnectionStatus>() { + @Override + public Long getValue(final ConnectionStatus status) { + return Long.valueOf(status.getOutputCount()); + } + })), + + QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>( + "queuedBytes", + "Queued Bytes", + "The number of Bytes queued in this Connection", + Formatter.DATA_SIZE, + new ValueMapper<ConnectionStatus>() { + @Override + public Long getValue(final ConnectionStatus status) { + return status.getQueuedBytes(); + } + })), + + QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>( + "queuedCount", + "Queued Count", + "The number of FlowFiles queued in this Connection", + Formatter.COUNT, + new ValueMapper<ConnectionStatus>() { + @Override + public Long getValue(final ConnectionStatus status) { + return Long.valueOf(status.getQueuedCount()); + } + })); + + + private MetricDescriptor<ConnectionStatus> descriptor; + + private ConnectionStatusDescriptor(final MetricDescriptor<ConnectionStatus> descriptor) { + this.descriptor = descriptor; + } + + public String getField() { + return descriptor.getField(); + } + + public MetricDescriptor<ConnectionStatus> getDescriptor() { + return descriptor; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java new file mode 100644 index 0000000..d5325d0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.status.history; + +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; + +public enum ProcessGroupStatusDescriptor { + + BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", "Bytes Read (5 mins)", + "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getBytesRead(); + } + })), + + BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 mins)", + "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getBytesWritten(); + } + })), + + BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes Transferred (5 mins)", + "The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes", + Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getBytesRead() + status.getBytesWritten(); + } + })), + + INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", "Bytes In (5 mins)", + "The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", + Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getInputContentSize(); + } + })), + + INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount", "FlowFiles In (5 mins)", + "The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", + Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getInputCount().longValue(); + } + })), + + OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 mins)", + "The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", + Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getOutputContentSize(); + } + })), + + OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 mins)", + "The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", + Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getOutputCount().longValue(); + } + })), + + QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes", + "The cumulative size of all FlowFiles queued in all Connections of this Process Group", + Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getQueuedContentSize(); + } + })), + + QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count", + "The number of FlowFiles queued in all Connections of this Process Group", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getQueuedCount().longValue(); + } + })), + + TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", "Total Task Duration (5 mins)", + "The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes", + Formatter.DURATION, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return calculateTaskMillis(status); + } + })); + + private MetricDescriptor<ProcessGroupStatus> descriptor; + + private ProcessGroupStatusDescriptor(final MetricDescriptor<ProcessGroupStatus> descriptor) { + this.descriptor = descriptor; + } + + public String getField() { + return descriptor.getField(); + } + + public MetricDescriptor<ProcessGroupStatus> getDescriptor() { + return descriptor; + } + + + private static long calculateTaskMillis(final ProcessGroupStatus status) { + long nanos = 0L; + + for (final ProcessorStatus procStatus : status.getProcessorStatus()) { + nanos += procStatus.getProcessingNanos(); + } + + for (final ProcessGroupStatus childStatus : status.getProcessGroupStatus()) { + nanos += calculateTaskMillis(childStatus); + } + + return TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS); + } +}
