http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index c0f4c63..303e98e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -29,8 +29,6 @@ import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.ListIterator; import java.util.Map; @@ -73,7 +71,6 @@ import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; import org.apache.nifi.annotation.lifecycle.OnRemoved; -import org.apache.nifi.cluster.BulletinsPayload; import org.apache.nifi.cluster.HeartbeatPayload; import org.apache.nifi.cluster.context.ClusterContext; import org.apache.nifi.cluster.context.ClusterContextImpl; @@ -88,6 +85,7 @@ import org.apache.nifi.cluster.manager.HttpClusterManager; import org.apache.nifi.cluster.manager.HttpRequestReplicator; import org.apache.nifi.cluster.manager.HttpResponseMapper; import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.StatusMerger; import org.apache.nifi.cluster.manager.exception.ConflictingNodeIdException; import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException; import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException; @@ -109,7 +107,6 @@ import org.apache.nifi.cluster.node.Node.Status; import org.apache.nifi.cluster.protocol.ConnectionRequest; import org.apache.nifi.cluster.protocol.ConnectionResponse; import org.apache.nifi.cluster.protocol.Heartbeat; -import org.apache.nifi.cluster.protocol.NodeBulletins; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolHandler; @@ -121,7 +118,6 @@ import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; @@ -130,7 +126,6 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.controller.ControllerService; -import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.StandardFlowSerializer; @@ -153,16 +148,14 @@ import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.controller.service.StandardControllerServiceProvider; import org.apache.nifi.controller.state.SortedStateUtils; import org.apache.nifi.controller.state.manager.StandardStateManagerProvider; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.RemoteProcessGroupStatus; -import org.apache.nifi.controller.status.history.ComponentStatusRepository; +import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; import org.apache.nifi.controller.status.history.MetricDescriptor; -import org.apache.nifi.controller.status.history.StatusHistory; +import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor; +import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor; +import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor; +import org.apache.nifi.controller.status.history.StandardStatusSnapshot; import org.apache.nifi.controller.status.history.StatusHistoryUtil; import org.apache.nifi.controller.status.history.StatusSnapshot; -import org.apache.nifi.diagnostics.GarbageCollection; -import org.apache.nifi.diagnostics.StorageUsage; -import org.apache.nifi.diagnostics.SystemDiagnostics; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.events.BulletinFactory; @@ -178,7 +171,6 @@ import org.apache.nifi.logging.NiFiLog; import org.apache.nifi.logging.ReportingTaskLogObserver; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; -import org.apache.nifi.nar.NarThreadContextClassLoader; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardValidationContextFactory; import org.apache.nifi.remote.RemoteResourceManager; @@ -188,7 +180,9 @@ import org.apache.nifi.remote.cluster.ClusterNodeInformation; import org.apache.nifi.remote.cluster.NodeInformation; import org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol; import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.BulletinQuery; import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingTask; @@ -202,14 +196,18 @@ import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.web.OptimisticLockingManager; import org.apache.nifi.web.Revision; import org.apache.nifi.web.UpdateRevision; +import org.apache.nifi.web.api.dto.BulletinBoardDTO; +import org.apache.nifi.web.api.dto.BulletinDTO; import org.apache.nifi.web.api.dto.ComponentStateDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; +import org.apache.nifi.web.api.dto.CountersDTO; import org.apache.nifi.web.api.dto.DropRequestDTO; import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.ListingRequestDTO; -import org.apache.nifi.web.api.dto.NodeDTO; +import org.apache.nifi.web.api.dto.NodeCountersSnapshotDTO; +import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsSnapshotDTO; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.QueueSizeDTO; @@ -219,30 +217,53 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.apache.nifi.web.api.dto.ReportingTaskDTO; import org.apache.nifi.web.api.dto.StateEntryDTO; import org.apache.nifi.web.api.dto.StateMapDTO; +import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO; -import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO; -import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO; +import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; +import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; +import org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.NodePortStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.NodeProcessorStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO; +import org.apache.nifi.web.api.dto.status.PortStatusDTO; +import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; +import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; +import org.apache.nifi.web.api.entity.BulletinBoardEntity; import org.apache.nifi.web.api.entity.ComponentStateEntity; +import org.apache.nifi.web.api.entity.ConnectionStatusEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; import org.apache.nifi.web.api.entity.ControllerServicesEntity; +import org.apache.nifi.web.api.entity.ControllerStatusEntity; +import org.apache.nifi.web.api.entity.CountersEntity; import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.FlowSnippetEntity; import org.apache.nifi.web.api.entity.ListingRequestEntity; +import org.apache.nifi.web.api.entity.PortStatusEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.ProcessorStatusEntity; import org.apache.nifi.web.api.entity.ProcessorsEntity; import org.apache.nifi.web.api.entity.ProvenanceEntity; import org.apache.nifi.web.api.entity.ProvenanceEventEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; +import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity; import org.apache.nifi.web.api.entity.ReportingTaskEntity; import org.apache.nifi.web.api.entity.ReportingTasksEntity; +import org.apache.nifi.web.api.entity.StatusHistoryEntity; +import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity; import org.apache.nifi.web.util.WebUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -310,10 +331,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C */ private static final int DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS = 5; - public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = "org.apache.nifi.controller.status.history.VolatileComponentStatusRepository"; public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors"); public static final Pattern PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}"); + public static final Pattern PROCESSOR_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/status"); public static final Pattern PROCESSOR_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/state"); public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}"); @@ -321,6 +342,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}"); public static final Pattern PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))"); + public static final Pattern GROUP_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status"); + public static final Pattern CONTROLLER_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/status"); public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance"); public static final Pattern FLOW_SNIPPET_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/snippet-instance"); @@ -328,7 +351,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}"); public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+"); - public static final Pattern COUNTERS_URI = Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}"); public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node"; public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}"); public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/state"); @@ -336,6 +358,24 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node"; public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}"); public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}/state"); + public static final Pattern BULLETIN_BOARD_URI_PATTERN = Pattern.compile("/nifi-api/controller/bulletin-board"); + public static final Pattern SYSTEM_DIAGNOSTICS_URI_PATTERN = Pattern.compile("/nifi-api/system-diagnostics"); + public static final Pattern COUNTERS_URI_PATTERN = Pattern.compile("/nifi-api/controller/counters"); + public static final Pattern COUNTER_URI_PATTERN = Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}"); + + public static final Pattern PROCESSOR_STATUS_HISTORY_URI_PATTERN = + Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/status/history"); + public static final Pattern PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status/history"); + public static final Pattern REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern + .compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}/status/history"); + public static final Pattern CONNECTION_STATUS_HISTORY_URI_PATTERN = Pattern + .compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/status/history"); + + public static final Pattern CONNECTION_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/status"); + public static final Pattern INPUT_PORT_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/input-ports/[a-f0-9\\-]{36}/status"); + public static final Pattern OUTPUT_PORT_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/output-ports/[a-f0-9\\-]{36}/status"); + public static final Pattern REMOTE_PROCESS_GROUP_STATUS_URI_PATTERN = + Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}/status"); @Deprecated public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents"); @@ -378,7 +418,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C private final BulletinRepository bulletinRepository; private final String instanceId; private final FlowEngine reportingTaskEngine; - private final Map<NodeIdentifier, ComponentStatusRepository> componentMetricsRepositoryMap = new HashMap<>(); private final StandardProcessScheduler processScheduler; private final StateManagerProvider stateManagerProvider; private final long componentStatusSnapshotMillis; @@ -451,11 +490,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C throw new RuntimeException(e); } - processScheduler = new StandardProcessScheduler(new Heartbeater() { - @Override - public void heartbeat() { - } - }, this, encryptor, stateManagerProvider); + processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider); // When we construct the scheduling agents, we can pass null for a lot of the arguments because we are only // going to be scheduling Reporting Tasks. Otherwise, it would not be okay. @@ -463,7 +498,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor)); processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10); processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10); - processScheduler.scheduleFrameworkTask(new CaptureComponentMetrics(), "Capture Component Metrics", componentStatusSnapshotMillis, componentStatusSnapshotMillis, TimeUnit.MILLISECONDS); controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider); } @@ -620,7 +654,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return MessageType.CONNECTION_REQUEST == msg.getType() || MessageType.HEARTBEAT == msg.getType() || MessageType.CONTROLLER_STARTUP_FAILURE == msg.getType() - || MessageType.BULLETINS == msg.getType() || MessageType.RECONNECTION_FAILURE == msg.getType(); } @@ -654,10 +687,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } }, "Handle Reconnection Failure Message from " + ((ReconnectionFailureMessage) protocolMessage).getNodeId()).start(); return null; - case BULLETINS: - final NodeBulletinsMessage bulletinsMessage = (NodeBulletinsMessage) protocolMessage; - handleBulletins(bulletinsMessage.getBulletins()); - return null; default: throw new ProtocolException("No handler defined for message type: " + protocolMessage.getType()); } @@ -1686,22 +1715,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C processScheduler.enableReportingTask(reportingTask); } - /** - * Handle a bulletins message. - * - * @param bulletins bulletins - */ - public void handleBulletins(final NodeBulletins bulletins) { - final NodeIdentifier nodeIdentifier = bulletins.getNodeIdentifier(); - final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort(); - - // unmarshal the message - final BulletinsPayload payload = BulletinsPayload.unmarshal(bulletins.getPayload()); - for (final Bulletin bulletin : payload.getBulletins()) { - bulletin.setNodeAddress(nodeAddress); - bulletinRepository.addBulletin(bulletin); - } - } /** * Handles a node's heartbeat. If this heartbeat is a node's first heartbeat since its connection request, then the manager will mark the node as connected. If the node was previously disconnected @@ -1875,20 +1888,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } - private ComponentStatusRepository createComponentStatusRepository() { - final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION); - if (implementationClassName == null) { - throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: " - + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION); - } - - try { - return NarThreadContextClassLoader.createInstance(implementationClassName, ComponentStatusRepository.class); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - @Override public Set<Node> getNodes(final Status... statuses) { final Set<Status> desiredStatusSet = new HashSet<>(); @@ -2434,6 +2433,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return false; } + private static boolean isProcessorStatusEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && PROCESSOR_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + private static boolean isProcessorStateEndpoint(final URI uri, final String method) { return "GET".equalsIgnoreCase(method) && PROCESSOR_STATE_URI_PATTERN.matcher(uri.getPath()).matches(); } @@ -2442,6 +2445,30 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return ("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches(); } + private static boolean isConnectionStatusEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && CONNECTION_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isInputPortStatusEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && INPUT_PORT_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isOutputPortStatusEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && OUTPUT_PORT_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isRemoteProcessGroupStatusEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUP_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isGroupStatusEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && GROUP_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isControllerStatusEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && CONTROLLER_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + private static boolean isTemplateEndpoint(final URI uri, final String method) { return "POST".equalsIgnoreCase(method) && TEMPLATE_INSTANCE_URI_PATTERN.matcher(uri.getPath()).matches(); } @@ -2454,6 +2481,35 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches(); } + private static boolean isProcessorStatusHistoryEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && PROCESSOR_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isProcessGroupStatusHistoryEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isRemoteProcessGroupStatusHistoryEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isConnectionStatusHistoryEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && CONNECTION_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isBulletinBoardEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && BULLETIN_BOARD_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isSystemDiagnosticsEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && SYSTEM_DIAGNOSTICS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isCountersEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && COUNTERS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isRemoteProcessGroupEndpoint(final URI uri, final String method) { if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REMOTE_PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches()) { return true; @@ -2487,8 +2543,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return false; } - private static boolean isCountersEndpoint(final URI uri) { - return COUNTERS_URI.matcher(uri.getPath()).matches(); + private static boolean isCounterEndpoint(final URI uri) { + return COUNTER_URI_PATTERN.matcher(uri.getPath()).matches(); } private static boolean isControllerServicesEndpoint(final URI uri, final String method) { @@ -2556,7 +2612,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C || isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) || isControllerServiceReferenceEndpoint(uri, method) || isControllerServiceStateEndpoint(uri, method) || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method) || isReportingTaskStateEndpoint(uri, method) - || isDropRequestEndpoint(uri, method) || isListFlowFilesEndpoint(uri, method); + || isDropRequestEndpoint(uri, method) || isListFlowFilesEndpoint(uri, method) + || isGroupStatusEndpoint(uri, method) || isProcessorStatusEndpoint(uri, method) || isControllerStatusEndpoint(uri, method) + || isConnectionStatusEndpoint(uri, method) || isRemoteProcessGroupStatusEndpoint(uri, method) + || isInputPortStatusEndpoint(uri, method) || isOutputPortStatusEndpoint(uri, method) + || isProcessorStatusHistoryEndpoint(uri, method) || isProcessGroupStatusHistoryEndpoint(uri, method) + || isRemoteProcessGroupStatusHistoryEndpoint(uri, method) || isConnectionStatusHistoryEndpoint(uri, method) + || isBulletinBoardEndpoint(uri, method) || isSystemDiagnosticsEndpoint(uri, method) + || isCountersEndpoint(uri, method); } private void mergeProcessorValidationErrors(final ProcessorDTO processor, Map<NodeIdentifier, ProcessorDTO> processorMap) { @@ -2608,6 +2671,303 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C componentState.getLocalState().setState(localStateEntries); } + + private void mergeSystemDiagnostics(final SystemDiagnosticsDTO target, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, SystemDiagnosticsDTO> resultMap) { + final SystemDiagnosticsDTO mergedSystemDiagnostics = target; + mergedSystemDiagnostics.setNodeSnapshots(new ArrayList<NodeSystemDiagnosticsSnapshotDTO>()); + + final NodeSystemDiagnosticsSnapshotDTO selectedNodeSnapshot = new NodeSystemDiagnosticsSnapshotDTO(); + selectedNodeSnapshot.setSnapshot(target.getAggregateSnapshot().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedSystemDiagnostics.getNodeSnapshots().add(selectedNodeSnapshot); + + for (final Map.Entry<NodeIdentifier, SystemDiagnosticsDTO> entry : resultMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final SystemDiagnosticsDTO toMerge = entry.getValue(); + if (toMerge == target) { + continue; + } + + StatusMerger.merge(mergedSystemDiagnostics, toMerge, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + + private void mergeCounters(final CountersDTO target, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, CountersDTO> resultMap) { + final CountersDTO mergedCounters = target; + mergedCounters.setNodeSnapshots(new ArrayList<NodeCountersSnapshotDTO>()); + + final NodeCountersSnapshotDTO selectedNodeSnapshot = new NodeCountersSnapshotDTO(); + selectedNodeSnapshot.setSnapshot(target.getAggregateSnapshot().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedCounters.getNodeSnapshots().add(selectedNodeSnapshot); + + for (final Map.Entry<NodeIdentifier, CountersDTO> entry : resultMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final CountersDTO toMerge = entry.getValue(); + if (toMerge == target) { + continue; + } + + StatusMerger.merge(mergedCounters, toMerge, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + + private void mergeGroupStatus(final ProcessGroupStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, ProcessGroupStatusDTO> resultMap) { + final ProcessGroupStatusDTO mergedProcessGroupStatus = statusDto; + mergedProcessGroupStatus.setNodeSnapshots(new ArrayList<NodeProcessGroupStatusSnapshotDTO>()); + + final NodeProcessGroupStatusSnapshotDTO selectedNodeSnapshot = new NodeProcessGroupStatusSnapshotDTO(); + selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedProcessGroupStatus.getNodeSnapshots().add(selectedNodeSnapshot); + + for (final Map.Entry<NodeIdentifier, ProcessGroupStatusDTO> entry : resultMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ProcessGroupStatusDTO nodeProcessGroupStatus = entry.getValue(); + if (nodeProcessGroupStatus == mergedProcessGroupStatus) { + continue; + } + + final ProcessGroupStatusSnapshotDTO nodeSnapshot = nodeProcessGroupStatus.getAggregateSnapshot(); + for (final RemoteProcessGroupStatusSnapshotDTO remoteProcessGroupStatus : nodeSnapshot.getRemoteProcessGroupStatusSnapshots()) { + final List<String> nodeAuthorizationIssues = remoteProcessGroupStatus.getAuthorizationIssues(); + if (!nodeAuthorizationIssues.isEmpty()) { + for (final ListIterator<String> iter = nodeAuthorizationIssues.listIterator(); iter.hasNext();) { + final String Issue = iter.next(); + iter.set("[" + nodeId.getApiAddress() + ":" + nodeId.getApiPort() + "] -- " + Issue); + } + remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues); + } + } + + StatusMerger.merge(mergedProcessGroupStatus, nodeProcessGroupStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + + + private void mergeProcessorStatus(final ProcessorStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, ProcessorStatusDTO> resultMap) { + final ProcessorStatusDTO mergedProcessorStatus = statusDto; + mergedProcessorStatus.setNodeSnapshots(new ArrayList<NodeProcessorStatusSnapshotDTO>()); + + final NodeProcessorStatusSnapshotDTO selectedNodeSnapshot = new NodeProcessorStatusSnapshotDTO(); + selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedProcessorStatus.getNodeSnapshots().add(selectedNodeSnapshot); + + // merge the other nodes + for (final Map.Entry<NodeIdentifier, ProcessorStatusDTO> entry : resultMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ProcessorStatusDTO nodeProcessorStatus = entry.getValue(); + if (nodeProcessorStatus == statusDto) { + continue; + } + + StatusMerger.merge(mergedProcessorStatus, nodeProcessorStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + + private void mergeConnectionStatus(final ConnectionStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, ConnectionStatusDTO> resultMap) { + final ConnectionStatusDTO mergedConnectionStatus = statusDto; + mergedConnectionStatus.setNodeSnapshots(new ArrayList<NodeConnectionStatusSnapshotDTO>()); + + final NodeConnectionStatusSnapshotDTO selectedNodeSnapshot = new NodeConnectionStatusSnapshotDTO(); + selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedConnectionStatus.getNodeSnapshots().add(selectedNodeSnapshot); + + // merge the other nodes + for (final Map.Entry<NodeIdentifier, ConnectionStatusDTO> entry : resultMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ConnectionStatusDTO nodeConnectionStatus = entry.getValue(); + if (nodeConnectionStatus == statusDto) { + continue; + } + + StatusMerger.merge(mergedConnectionStatus, nodeConnectionStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + + private void mergePortStatus(final PortStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, PortStatusDTO> resultMap) { + final PortStatusDTO mergedPortStatus = statusDto; + mergedPortStatus.setNodeSnapshots(new ArrayList<NodePortStatusSnapshotDTO>()); + + final NodePortStatusSnapshotDTO selectedNodeSnapshot = new NodePortStatusSnapshotDTO(); + selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedPortStatus.getNodeSnapshots().add(selectedNodeSnapshot); + + // merge the other nodes + for (final Map.Entry<NodeIdentifier, PortStatusDTO> entry : resultMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final PortStatusDTO nodePortStatus = entry.getValue(); + if (nodePortStatus == statusDto) { + continue; + } + + StatusMerger.merge(mergedPortStatus, nodePortStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + + private void mergeRemoteProcessGroupStatus(final RemoteProcessGroupStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, RemoteProcessGroupStatusDTO> resultMap) { + final RemoteProcessGroupStatusDTO mergedRemoteProcessGroupStatus = statusDto; + mergedRemoteProcessGroupStatus.setNodeSnapshots(new ArrayList<NodeRemoteProcessGroupStatusSnapshotDTO>()); + + final NodeRemoteProcessGroupStatusSnapshotDTO selectedNodeSnapshot = new NodeRemoteProcessGroupStatusSnapshotDTO(); + selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedRemoteProcessGroupStatus.getNodeSnapshots().add(selectedNodeSnapshot); + + // merge the other nodes + for (final Map.Entry<NodeIdentifier, RemoteProcessGroupStatusDTO> entry : resultMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final RemoteProcessGroupStatusDTO nodeRemoteProcessGroupStatus = entry.getValue(); + if (nodeRemoteProcessGroupStatus == statusDto) { + continue; + } + + StatusMerger.merge(mergedRemoteProcessGroupStatus, nodeRemoteProcessGroupStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + + private void mergeControllerStatus(final ControllerStatusDTO statusDto, final Map<NodeIdentifier, ControllerStatusDTO> resultMap) { + ControllerStatusDTO mergedStatus = statusDto; + for (final Map.Entry<NodeIdentifier, ControllerStatusDTO> entry : resultMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ControllerStatusDTO nodeStatus = entry.getValue(); + + final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort(); + for (final BulletinDTO bulletin : nodeStatus.getBulletins()) { + bulletin.setNodeAddress(nodeAddress); + } + for (final BulletinDTO bulletin : nodeStatus.getControllerServiceBulletins()) { + bulletin.setNodeAddress(nodeAddress); + } + for (final BulletinDTO bulletin : nodeStatus.getReportingTaskBulletins()) { + bulletin.setNodeAddress(nodeAddress); + } + + if (nodeStatus == mergedStatus) { + continue; + } + + StatusMerger.merge(mergedStatus, nodeStatus); + } + + final int totalNodeCount = getNodeIds().size(); + final int connectedNodeCount = getNodeIds(Status.CONNECTED).size(); + + final List<Bulletin> ncmControllerBulletins = getBulletinRepository().findBulletinsForController(); + mergedStatus.setBulletins(mergeNCMBulletins(mergedStatus.getBulletins(), ncmControllerBulletins)); + + // get the controller service bulletins + final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build(); + final List<Bulletin> ncmServiceBulletins = getBulletinRepository().findBulletins(controllerServiceQuery); + mergedStatus.setControllerServiceBulletins(mergeNCMBulletins(mergedStatus.getControllerServiceBulletins(), ncmServiceBulletins)); + + // get the reporting task bulletins + final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build(); + final List<Bulletin> ncmReportingTaskBulletins = getBulletinRepository().findBulletins(reportingTaskQuery); + mergedStatus.setReportingTaskBulletins(mergeNCMBulletins(mergedStatus.getReportingTaskBulletins(), ncmReportingTaskBulletins)); + + mergedStatus.setConnectedNodeCount(connectedNodeCount); + mergedStatus.setTotalNodeCount(totalNodeCount); + StatusMerger.updatePrettyPrintedFields(mergedStatus); + } + + private List<BulletinDTO> mergeNCMBulletins(final List<BulletinDTO> nodeBulletins, final List<Bulletin> ncmBulletins) { + if (ncmBulletins == null || ncmBulletins.isEmpty()) { + return nodeBulletins; + } + + final List<BulletinDTO> mergedBulletins = new ArrayList<>(nodeBulletins.size() + ncmBulletins.size()); + mergedBulletins.addAll(nodeBulletins); + mergedBulletins.addAll(createBulletinDtos(ncmBulletins)); + return mergedBulletins; + } + + private void mergeBulletinBoard(final BulletinBoardDTO nodeBulletinBoard, final Map<NodeIdentifier, BulletinBoardDTO> resultMap) { + final List<BulletinDTO> bulletinDtos = new ArrayList<>(); + for (final Map.Entry<NodeIdentifier, BulletinBoardDTO> entry : resultMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final BulletinBoardDTO boardDto = entry.getValue(); + final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort(); + + for (final BulletinDTO bulletin : boardDto.getBulletins()) { + bulletin.setNodeAddress(nodeAddress); + bulletinDtos.add(bulletin); + } + } + + Collections.sort(bulletinDtos, new Comparator<BulletinDTO>() { + @Override + public int compare(final BulletinDTO o1, final BulletinDTO o2) { + final int timeComparison = o1.getTimestamp().compareTo(o2.getTimestamp()); + if (timeComparison != 0) { + return timeComparison; + } + + return o1.getNodeAddress().compareTo(o2.getNodeAddress()); + } + }); + + nodeBulletinBoard.setBulletins(bulletinDtos); + } + + /** + * Creates BulletinDTOs for the specified Bulletins. + * + * @param bulletins bulletin + * @return dto + */ + public List<BulletinDTO> createBulletinDtos(final List<Bulletin> bulletins) { + final List<BulletinDTO> bulletinDtos = new ArrayList<>(bulletins.size()); + for (final Bulletin bulletin : bulletins) { + bulletinDtos.add(createBulletinDto(bulletin)); + } + return bulletinDtos; + } + + /** + * Creates a BulletinDTO for the specified Bulletin. + * + * @param bulletin bulletin + * @return dto + */ + public BulletinDTO createBulletinDto(final Bulletin bulletin) { + final BulletinDTO dto = new BulletinDTO(); + dto.setId(bulletin.getId()); + dto.setNodeAddress(bulletin.getNodeAddress()); + dto.setTimestamp(bulletin.getTimestamp()); + dto.setGroupId(bulletin.getGroupId()); + dto.setSourceId(bulletin.getSourceId()); + dto.setSourceName(bulletin.getSourceName()); + dto.setCategory(bulletin.getCategory()); + dto.setLevel(bulletin.getLevel()); + dto.setMessage(bulletin.getMessage()); + return dto; + } + private void mergeProvenanceQueryResults(final ProvenanceDTO provenanceDto, final Map<NodeIdentifier, ProvenanceDTO> resultMap, final Set<NodeResponse> problematicResponses) { final ProvenanceResultsDTO results = provenanceDto.getResults(); final ProvenanceRequestDTO request = provenanceDto.getRequest(); @@ -3545,6 +3905,252 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C mergeComponentState(componentState, resultsMap); clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isGroupStatusEndpoint(uri, method)) { + final ProcessGroupStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessGroupStatusEntity.class); + final ProcessGroupStatusDTO statusRequest = responseEntity.getProcessGroupStatus(); + + NodeIdentifier nodeIdentifier = null; + + final Map<NodeIdentifier, ProcessGroupStatusDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final ProcessGroupStatusEntity nodeResponseEntity; + if (nodeResponse == clientResponse) { + nodeIdentifier = nodeResponse.getNodeId(); + nodeResponseEntity = responseEntity; + } else { + nodeResponseEntity = nodeResponse.getClientResponse().getEntity(ProcessGroupStatusEntity.class); + } + + final ProcessGroupStatusDTO nodeStatus = nodeResponseEntity.getProcessGroupStatus(); + resultsMap.put(nodeResponse.getNodeId(), nodeStatus); + } + mergeGroupStatus(statusRequest, nodeIdentifier, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isProcessorStatusEndpoint(uri, method)) { + final ProcessorStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessorStatusEntity.class); + final ProcessorStatusDTO statusRequest = responseEntity.getProcessorStatus(); + + NodeIdentifier nodeIdentifier = null; + + final Map<NodeIdentifier, ProcessorStatusDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final ProcessorStatusEntity nodeResponseEntity; + if (nodeResponse == clientResponse) { + nodeIdentifier = nodeResponse.getNodeId(); + nodeResponseEntity = responseEntity; + } else { + nodeResponseEntity = nodeResponse.getClientResponse().getEntity(ProcessorStatusEntity.class); + } + + final ProcessorStatusDTO nodeStatus = nodeResponseEntity.getProcessorStatus(); + resultsMap.put(nodeResponse.getNodeId(), nodeStatus); + } + mergeProcessorStatus(statusRequest, nodeIdentifier, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isConnectionStatusEndpoint(uri, method)) { + final ConnectionStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ConnectionStatusEntity.class); + final ConnectionStatusDTO statusRequest = responseEntity.getConnectionStatus(); + + NodeIdentifier nodeIdentifier = null; + + final Map<NodeIdentifier, ConnectionStatusDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final ConnectionStatusEntity nodeResponseEntity; + if (nodeResponse == clientResponse) { + nodeIdentifier = nodeResponse.getNodeId(); + nodeResponseEntity = responseEntity; + } else { + nodeResponseEntity = nodeResponse.getClientResponse().getEntity(ConnectionStatusEntity.class); + } + + final ConnectionStatusDTO nodeStatus = nodeResponseEntity.getConnectionStatus(); + resultsMap.put(nodeResponse.getNodeId(), nodeStatus); + } + mergeConnectionStatus(statusRequest, nodeIdentifier, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && (isInputPortStatusEndpoint(uri, method) || isOutputPortStatusEndpoint(uri, method))) { + final PortStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(PortStatusEntity.class); + final PortStatusDTO statusRequest = responseEntity.getPortStatus(); + + NodeIdentifier nodeIdentifier = null; + + final Map<NodeIdentifier, PortStatusDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final PortStatusEntity nodeResponseEntity; + if (nodeResponse == clientResponse) { + nodeIdentifier = nodeResponse.getNodeId(); + nodeResponseEntity = responseEntity; + } else { + nodeResponseEntity = nodeResponse.getClientResponse().getEntity(PortStatusEntity.class); + } + + final PortStatusDTO nodeStatus = nodeResponseEntity.getPortStatus(); + resultsMap.put(nodeResponse.getNodeId(), nodeStatus); + } + mergePortStatus(statusRequest, nodeIdentifier, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isRemoteProcessGroupStatusEndpoint(uri, method)) { + final RemoteProcessGroupStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(RemoteProcessGroupStatusEntity.class); + final RemoteProcessGroupStatusDTO statusRequest = responseEntity.getRemoteProcessGroupStatus(); + + NodeIdentifier nodeIdentifier = null; + + final Map<NodeIdentifier, RemoteProcessGroupStatusDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final RemoteProcessGroupStatusEntity nodeResponseEntity; + if (nodeResponse == clientResponse) { + nodeIdentifier = nodeResponse.getNodeId(); + nodeResponseEntity = responseEntity; + } else { + nodeResponseEntity = nodeResponse.getClientResponse().getEntity(RemoteProcessGroupStatusEntity.class); + } + + final RemoteProcessGroupStatusDTO nodeStatus = nodeResponseEntity.getRemoteProcessGroupStatus(); + resultsMap.put(nodeResponse.getNodeId(), nodeStatus); + } + mergeRemoteProcessGroupStatus(statusRequest, nodeIdentifier, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isControllerStatusEndpoint(uri, method)) { + final ControllerStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerStatusEntity.class); + final ControllerStatusDTO statusRequest = responseEntity.getControllerStatus(); + + final Map<NodeIdentifier, ControllerStatusDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final ControllerStatusEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerStatusEntity.class); + final ControllerStatusDTO nodeStatus = nodeResponseEntity.getControllerStatus(); + + resultsMap.put(nodeResponse.getNodeId(), nodeStatus); + } + mergeControllerStatus(statusRequest, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isBulletinBoardEndpoint(uri, method)) { + final BulletinBoardEntity responseEntity = clientResponse.getClientResponse().getEntity(BulletinBoardEntity.class); + final BulletinBoardDTO responseDto = responseEntity.getBulletinBoard(); + + final Map<NodeIdentifier, BulletinBoardDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final BulletinBoardEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(BulletinBoardEntity.class); + final BulletinBoardDTO nodeStatus = nodeResponseEntity.getBulletinBoard(); + + resultsMap.put(nodeResponse.getNodeId(), nodeStatus); + } + mergeBulletinBoard(responseDto, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isProcessorStatusHistoryEndpoint(uri, method)) { + final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>(); + for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) { + metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor()); + } + + clientResponse = mergeStatusHistoryResponses(clientResponse, updatedNodesMap, problematicNodeResponses, metricDescriptors); + } else if (hasSuccessfulClientResponse && isConnectionStatusHistoryEndpoint(uri, method)) { + final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>(); + for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) { + metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor()); + } + + clientResponse = mergeStatusHistoryResponses(clientResponse, updatedNodesMap, problematicNodeResponses, metricDescriptors); + } else if (hasSuccessfulClientResponse && isProcessGroupStatusHistoryEndpoint(uri, method)) { + final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>(); + for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) { + metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor()); + } + + clientResponse = mergeStatusHistoryResponses(clientResponse, updatedNodesMap, problematicNodeResponses, metricDescriptors); + } else if (hasSuccessfulClientResponse && isRemoteProcessGroupStatusHistoryEndpoint(uri, method)) { + final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>(); + for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) { + metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor()); + } + + clientResponse = mergeStatusHistoryResponses(clientResponse, updatedNodesMap, problematicNodeResponses, metricDescriptors); + } else if (hasSuccessfulClientResponse && isSystemDiagnosticsEndpoint(uri, method)) { + final SystemDiagnosticsEntity responseEntity = clientResponse.getClientResponse().getEntity(SystemDiagnosticsEntity.class); + final SystemDiagnosticsDTO responseDto = responseEntity.getSystemDiagnostics(); + + NodeIdentifier nodeIdentifier = null; + + final Map<NodeIdentifier, SystemDiagnosticsDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final SystemDiagnosticsEntity nodeResponseEntity; + if (nodeResponse == clientResponse) { + nodeIdentifier = nodeResponse.getNodeId(); + nodeResponseEntity = responseEntity; + } else { + nodeResponseEntity = nodeResponse.getClientResponse().getEntity(SystemDiagnosticsEntity.class); + } + + final SystemDiagnosticsDTO nodeStatus = nodeResponseEntity.getSystemDiagnostics(); + resultsMap.put(nodeResponse.getNodeId(), nodeStatus); + } + mergeSystemDiagnostics(responseDto, nodeIdentifier, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isCountersEndpoint(uri, method)) { + final CountersEntity responseEntity = clientResponse.getClientResponse().getEntity(CountersEntity.class); + final CountersDTO responseDto = responseEntity.getCounters(); + + NodeIdentifier nodeIdentifier = null; + + final Map<NodeIdentifier, CountersDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final CountersEntity nodeResponseEntity; + if (nodeResponse == clientResponse) { + nodeIdentifier = nodeResponse.getNodeId(); + nodeResponseEntity = responseEntity; + } else { + nodeResponseEntity = nodeResponse.getClientResponse().getEntity(CountersEntity.class); + } + + final CountersDTO nodeStatus = nodeResponseEntity.getCounters(); + resultsMap.put(nodeResponse.getNodeId(), nodeStatus); + } + mergeCounters(responseDto, nodeIdentifier, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); } else { if (!nodeResponsesToDrain.isEmpty()) { drainResponses(nodeResponsesToDrain); @@ -3603,6 +4209,49 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return clientResponse; } + + private NodeResponse mergeStatusHistoryResponses(NodeResponse clientResponse, Map<Node, NodeResponse> updatedNodesMap, Set<NodeResponse> problematicNodeResponses, + Map<String, MetricDescriptor<?>> metricDescriptors) { + final StatusHistoryEntity responseEntity = clientResponse.getClientResponse().getEntity(StatusHistoryEntity.class); + + StatusHistoryDTO lastStatusHistory = null; + final List<NodeStatusSnapshotsDTO> nodeStatusSnapshots = new ArrayList<>(updatedNodesMap.size()); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final StatusHistoryEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(StatusHistoryEntity.class); + final StatusHistoryDTO nodeStatus = nodeResponseEntity.getStatusHistory(); + lastStatusHistory = nodeStatus; + + final NodeIdentifier nodeId = nodeResponse.getNodeId(); + final NodeStatusSnapshotsDTO nodeStatusSnapshot = new NodeStatusSnapshotsDTO(); + nodeStatusSnapshot.setNodeId(nodeId.getId()); + nodeStatusSnapshot.setAddress(nodeId.getApiAddress()); + nodeStatusSnapshot.setApiPort(nodeId.getApiPort()); + nodeStatusSnapshot.setStatusSnapshots(nodeStatus.getAggregateSnapshots()); + nodeStatusSnapshots.add(nodeStatusSnapshot); + } + + final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO(); + clusterStatusHistory.setAggregateSnapshots(mergeStatusHistories(nodeStatusSnapshots, metricDescriptors)); + clusterStatusHistory.setGenerated(new Date()); + clusterStatusHistory.setNodeSnapshots(nodeStatusSnapshots); + if (lastStatusHistory != null) { + clusterStatusHistory.setComponentDetails(lastStatusHistory.getComponentDetails()); + clusterStatusHistory.setFieldDescriptors(lastStatusHistory.getFieldDescriptors()); + } + + final StatusHistoryEntity clusterEntity = new StatusHistoryEntity(); + clusterEntity.setStatusHistory(clusterStatusHistory); + clusterEntity.setRevision(responseEntity.getRevision()); + + return new NodeResponse(clientResponse, clusterEntity); + } + + + /** * Determines if all problematic responses were due to 404 NOT_FOUND. Assumes that problematicNodeResponses is not empty and is not comprised of responses from all nodes in the cluster (at least * one node contained the counter in question). @@ -3612,7 +4261,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C * @return Whether all problematic node responses were due to a missing counter */ private boolean isMissingCounter(final Set<NodeResponse> problematicNodeResponses, final URI uri) { - if (isCountersEndpoint(uri)) { + if (isCounterEndpoint(uri)) { boolean notFound = true; for (final NodeResponse problematicResponse : problematicNodeResponses) { if (problematicResponse.getStatus() != 404) { @@ -4026,207 +4675,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return bulletinRepository; } - @Override - public ProcessGroupStatus getProcessGroupStatus(final String groupId) { - final Set<Node> connectedNodes = getNodes(Node.Status.CONNECTED); - - // ensure there are some nodes in the cluster - if (connectedNodes.isEmpty()) { - throw new NoConnectedNodesException(); - } - - ProcessGroupStatus mergedProcessGroupStatus = null; - for (final Node node : connectedNodes) { - final NodeIdentifier nodeId = node.getNodeId(); - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - final ProcessGroupStatus nodeRootProcessGroupStatus = nodeHeartbeatPayload.getProcessGroupStatus(); - final ProcessGroupStatus nodeProcessGroupStatus = groupId.equals(ROOT_GROUP_ID_ALIAS) ? nodeRootProcessGroupStatus : getProcessGroupStatus(nodeRootProcessGroupStatus, groupId); - if (nodeProcessGroupStatus == null) { - continue; - } - - if (mergedProcessGroupStatus == null) { - mergedProcessGroupStatus = nodeProcessGroupStatus.clone(); - - // update any issues with the node label - if (mergedProcessGroupStatus.getRemoteProcessGroupStatus() != null) { - for (final RemoteProcessGroupStatus remoteProcessGroupStatus : mergedProcessGroupStatus.getRemoteProcessGroupStatus()) { - final List<String> nodeAuthorizationIssues = remoteProcessGroupStatus.getAuthorizationIssues(); - if (!nodeAuthorizationIssues.isEmpty()) { - for (final ListIterator<String> iter = nodeAuthorizationIssues.listIterator(); iter.hasNext();) { - final String Issue = iter.next(); - iter.set("[" + nodeId.getApiAddress() + ":" + nodeId.getApiPort() + "] -- " + Issue); - } - remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues); - } - } - } - } else { - final ProcessGroupStatus nodeClone = nodeProcessGroupStatus.clone(); - for (final RemoteProcessGroupStatus remoteProcessGroupStatus : nodeClone.getRemoteProcessGroupStatus()) { - final List<String> nodeAuthorizationIssues = remoteProcessGroupStatus.getAuthorizationIssues(); - if (!nodeAuthorizationIssues.isEmpty()) { - for (final ListIterator<String> iter = nodeAuthorizationIssues.listIterator(); iter.hasNext();) { - final String Issue = iter.next(); - iter.set("[" + nodeId.getApiAddress() + ":" + nodeId.getApiPort() + "] -- " + Issue); - } - remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues); - } - } - - ProcessGroupStatus.merge(mergedProcessGroupStatus, nodeClone); - } - } - - return mergedProcessGroupStatus; - } - - private ProcessGroupStatus getProcessGroupStatus(final ProcessGroupStatus parent, final String groupId) { - if (parent.getId().equals(groupId)) { - return parent; - } - - for (final ProcessGroupStatus child : parent.getProcessGroupStatus()) { - final ProcessGroupStatus matching = getProcessGroupStatus(child, groupId); - if (matching != null) { - return matching; - } - } - - return null; - } - - @Override - public SystemDiagnostics getSystemDiagnostics() { - final Set<Node> connectedNodes = getNodes(Node.Status.CONNECTED); - - // ensure there are some nodes... - if (connectedNodes.isEmpty()) { - throw new NoConnectedNodesException(); - } - - SystemDiagnostics clusterDiagnostics = null; - for (final Node node : connectedNodes) { - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - final SystemDiagnostics nodeDiagnostics = nodeHeartbeatPayload.getSystemDiagnostics(); - if (nodeDiagnostics == null) { - continue; - } - - if (clusterDiagnostics == null) { - clusterDiagnostics = nodeDiagnostics.clone(); - } else { - merge(clusterDiagnostics, nodeDiagnostics); - } - } - - return clusterDiagnostics; - } - - private void merge(final SystemDiagnostics target, final SystemDiagnostics sd) { - - // threads - target.setDaemonThreads(target.getDaemonThreads() + sd.getDaemonThreads()); - target.setTotalThreads(target.getTotalThreads() + sd.getTotalThreads()); - - // heap - target.setTotalHeap(target.getTotalHeap() + sd.getTotalHeap()); - target.setUsedHeap(target.getUsedHeap() + sd.getUsedHeap()); - target.setMaxHeap(target.getMaxHeap() + sd.getMaxHeap()); - - // non heap - target.setTotalNonHeap(target.getTotalNonHeap() + sd.getTotalNonHeap()); - target.setUsedNonHeap(target.getUsedNonHeap() + sd.getUsedNonHeap()); - target.setMaxNonHeap(target.getMaxNonHeap() + sd.getMaxNonHeap()); - - // processors - target.setAvailableProcessors(target.getAvailableProcessors() + sd.getAvailableProcessors()); - - // load - if (sd.getProcessorLoadAverage() != null) { - if (target.getProcessorLoadAverage() != null) { - target.setProcessorLoadAverage(target.getProcessorLoadAverage() + sd.getProcessorLoadAverage()); - } else { - target.setProcessorLoadAverage(sd.getProcessorLoadAverage()); - } - } - - // db disk usage - merge(target.getFlowFileRepositoryStorageUsage(), sd.getFlowFileRepositoryStorageUsage()); - - // repo disk usage - final Map<String, StorageUsage> targetContentRepoMap; - if (target.getContentRepositoryStorageUsage() == null) { - targetContentRepoMap = new LinkedHashMap<>(); - target.setContentRepositoryStorageUsage(targetContentRepoMap); - } else { - targetContentRepoMap = target.getContentRepositoryStorageUsage(); - } - if (sd.getContentRepositoryStorageUsage() != null) { - for (final Map.Entry<String, StorageUsage> sdEntry : sd.getContentRepositoryStorageUsage().entrySet()) { - final StorageUsage mergedDiskUsage = targetContentRepoMap.get(sdEntry.getKey()); - if (mergedDiskUsage == null) { - targetContentRepoMap.put(sdEntry.getKey(), sdEntry.getValue()); - } else { - merge(mergedDiskUsage, sdEntry.getValue()); - } - } - } - - // garbage collection - final Map<String, GarbageCollection> targetGarbageCollection; - if (target.getGarbageCollection() == null) { - targetGarbageCollection = new LinkedHashMap<>(); - target.setGarbageCollection(targetGarbageCollection); - } else { - targetGarbageCollection = target.getGarbageCollection(); - } - if (sd.getGarbageCollection() != null) { - for (final Map.Entry<String, GarbageCollection> gcEntry : sd.getGarbageCollection().entrySet()) { - final GarbageCollection mergedGarbageCollection = targetGarbageCollection.get(gcEntry.getKey()); - if (mergedGarbageCollection == null) { - targetGarbageCollection.put(gcEntry.getKey(), gcEntry.getValue().clone()); - } else { - merge(mergedGarbageCollection, gcEntry.getValue()); - } - } - } - } - - private void merge(final StorageUsage target, final StorageUsage du) { - target.setFreeSpace(target.getFreeSpace() + du.getFreeSpace()); - target.setTotalSpace(target.getTotalSpace() + du.getTotalSpace()); - } - - private void merge(final GarbageCollection target, final GarbageCollection gc) { - target.setCollectionCount(target.getCollectionCount() + gc.getCollectionCount()); - target.setCollectionTime(target.getCollectionTime() + gc.getCollectionTime()); - } public static Date normalizeStatusSnapshotDate(final Date toNormalize, final long numMillis) { final long time = toNormalize.getTime(); return new Date(time - time % numMillis); } - private NodeDTO createNodeDTO(final Node node) { - final NodeDTO nodeDto = new NodeDTO(); - final NodeIdentifier nodeId = node.getNodeId(); - nodeDto.setNodeId(nodeId.getId()); - nodeDto.setAddress(nodeId.getApiAddress()); - nodeDto.setApiPort(nodeId.getApiPort()); - nodeDto.setStatus(node.getStatus().name()); - nodeDto.setPrimary(node.equals(getPrimaryNode())); - final Date connectionRequested = new Date(node.getConnectionRequestedTimestamp()); - nodeDto.setConnectionRequested(connectionRequested); - - return nodeDto; - } private List<StatusSnapshotDTO> aggregate(Map<Date, List<StatusSnapshot>> snapshotsToAggregate) { // Aggregate the snapshots @@ -4245,278 +4699,65 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return aggregatedSnapshotDtos; } - public ClusterStatusHistoryDTO getProcessorStatusHistory(final String processorId) { - return getProcessorStatusHistory(processorId, null, null, Integer.MAX_VALUE); - } - - public ClusterStatusHistoryDTO getProcessorStatusHistory(final String processorId, final Date startDate, final Date endDate, final int preferredDataPoints) { - final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>(); - StatusHistoryDTO lastStatusHistory = null; - final Set<MetricDescriptor<?>> processorDescriptors = new LinkedHashSet<>(); - final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); + private StatusSnapshot createSnapshot(final StatusSnapshotDTO snapshotDto, final Map<String, MetricDescriptor<?>> metricDescriptors) { + final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(); + snapshot.setTimestamp(snapshotDto.getTimestamp()); - for (final Node node : getRawNodes()) { - final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId()); - if (statusRepository == null) { - continue; - } + final Map<String, Long> metrics = snapshotDto.getStatusMetrics(); + for (final Map.Entry<String, Long> entry : metrics.entrySet()) { + final String metricId = entry.getKey(); + final Long value = entry.getValue(); - final StatusHistory statusHistory = statusRepository.getProcessorStatusHistory(processorId, startDate, endDate, preferredDataPoints); - if (statusHistory == null) { - continue; - } - - processorDescriptors.addAll(statusRepository.getProcessorMetricDescriptors()); - - // record the status history (last) to get the component details for use later - final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory); - lastStatusHistory = statusHistoryDto; - - final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO(); - nodeHistory.setStatusHistory(statusHistoryDto); - nodeHistory.setNode(createNodeDTO(node)); - nodeHistories.add(nodeHistory); - - // collect all of the snapshots to aggregate - for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) { - final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis); - List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate); - if (snapshots == null) { - snapshots = new ArrayList<>(); - snapshotsToAggregate.put(normalizedDate, snapshots); - } - snapshots.add(snapshot); + final MetricDescriptor<?> descriptor = metricDescriptors.get(metricId); + if (descriptor != null) { + snapshot.addStatusMetric(descriptor, value); } } - // Aggregate the snapshots - final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate); - - // get the details for this component from the last status history - final LinkedHashMap<String, String> clusterStatusHistoryDetails = new LinkedHashMap<>(); - clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails()); - - final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO(); - clusterStatusHistory.setGenerated(new Date()); - clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(processorDescriptors)); - clusterStatusHistory.setDetails(clusterStatusHistoryDetails); - clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos); - - final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO(); - history.setGenerated(new Date()); - history.setNodeStatusHistory(nodeHistories); - history.setClusterStatusHistory(clusterStatusHistory); - return history; + return snapshot; } - public StatusHistoryDTO createStatusHistoryDto(final StatusHistory statusHistory) { - final StatusHistoryDTO dto = new StatusHistoryDTO(); - - dto.setDetails(new LinkedHashMap<>(statusHistory.getComponentDetails())); - dto.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(statusHistory)); - dto.setGenerated(statusHistory.getDateGenerated()); + private List<StatusSnapshotDTO> mergeStatusHistories(final List<NodeStatusSnapshotsDTO> nodeStatusSnapshots, final Map<String, MetricDescriptor<?>> metricDescriptors) { + // We want a Map<Date, List<StatusSnapshot>>, which is a Map of "normalized Date" (i.e., a time range, essentially) + // to all Snapshots for that time. The list will contain one snapshot for each node. However, we can have the case + // where the NCM has a different value for the componentStatusSnapshotMillis than the nodes have. In this case, + // we end up with multiple entries in the List<StatusSnapshot> for the same node/timestamp, which skews our aggregate + // results. In order to avoid this, we will use only the latest snapshot for a node that falls into the the time range + // of interest. + // To accomplish this, we have an intermediate data structure, which is a Map of "normalized Date" to an inner Map + // of Node Identifier to StatusSnapshot. We then will flatten this Map and aggregate the results. + final Map<Date, Map<String, StatusSnapshot>> dateToNodeSnapshots = new TreeMap<>(); - final List<StatusSnapshotDTO> statusSnapshots = new ArrayList<>(); - for (final StatusSnapshot statusSnapshot : statusHistory.getStatusSnapshots()) { - statusSnapshots.add(StatusHistoryUtil.createStatusSnapshotDto(statusSnapshot)); - } - dto.setStatusSnapshots(statusSnapshots); - - return dto; - } - - public ClusterStatusHistoryDTO getConnectionStatusHistory(final String connectionId) { - return getConnectionStatusHistory(connectionId, null, null, Integer.MAX_VALUE); - } - - public ClusterStatusHistoryDTO getConnectionStatusHistory(final String connectionId, final Date startDate, final Date endDate, final int preferredDataPoints) { - final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>(); - - StatusHistoryDTO lastStatusHistory = null; - final Set<MetricDescriptor<?>> connectionDescriptors = new LinkedHashSet<>(); - final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); - - for (final Node node : getRawNodes()) { - final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId()); - if (statusRepository == null) { - continue; - } - - final StatusHistory statusHistory = statusRepository.getConnectionStatusHistory(connectionId, startDate, endDate, preferredDataPoints); - if (statusHistory == null) { - continue; - } - - final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory); - // record the status history (last) to get the componet details for use later - lastStatusHistory = statusHistoryDto; - connectionDescriptors.addAll(statusRepository.getConnectionMetricDescriptors()); - - final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO(); - nodeHistory.setStatusHistory(statusHistoryDto); - nodeHistory.setNode(createNodeDTO(node)); - nodeHistories.add(nodeHistory); - - // collect all of the snapshots to aggregate - for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) { + // group status snapshot's for each node by date + for (final NodeStatusSnapshotsDTO nodeStatusSnapshot : nodeStatusSnapshots) { + for (final StatusSnapshotDTO snapshotDto : nodeStatusSnapshot.getStatusSnapshots()) { + final StatusSnapshot snapshot = createSnapshot(snapshotDto, metricDescriptors); final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis); - List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate); - if (snapshots == null) { - snapshots = new ArrayList<>(); - snapshotsToAggregate.put(normalizedDate, snapshots); - } - snapshots.add(snapshot); - } - } - - // Aggregate the snapshots - final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate); - - // get the details for this component from the last status history - final LinkedHashMap<String, String> clusterStatusHistoryDetails = new LinkedHashMap<>(); - clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails()); - - final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO(); - clusterStatusHistory.setGenerated(new Date()); - clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(connectionDescriptors)); - clusterStatusHistory.setDetails(clusterStatusHistoryDetails); - clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos); - - final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO(); - history.setGenerated(new Date()); - history.setNodeStatusHistory(nodeHistories); - history.setClusterStatusHistory(clusterStatusHistory); - return history; - } - - public ClusterStatusHistoryDTO getProcessGroupStatusHistory(final String processGroupId) { - return getProcessGroupStatusHistory(processGroupId, null, null, Integer.MAX_VALUE); - } - - public ClusterStatusHistoryDTO getProcessGroupStatusHistory(final String processGroupId, final Date startDate, final Date endDate, final int preferredDataPoints) { - final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>(); - StatusHistoryDTO lastStatusHistory = null; - final Set<MetricDescriptor<?>> processGroupDescriptors = new LinkedHashSet<>(); - final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); - - for (final Node node : getRawNodes()) { - final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId()); - if (statusRepository == null) { - continue; - } - - final StatusHistory statusHistory = statusRepository.getProcessGroupStatusHistory(processGroupId, startDate, endDate, preferredDataPoints); - if (statusHistory == null) { - continue; - } - - final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory); - // record the status history (last) to get the componet details for use later - lastStatusHistory = statusHistoryDto; - processGroupDescriptors.addAll(statusRepository.getProcessGroupMetricDescriptors()); - - final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO(); - nodeHistory.setStatusHistory(statusHistoryDto); - nodeHistory.setNode(createNodeDTO(node)); - nodeHistories.add(nodeHistory); - - // collect all of the snapshots to aggregate - for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) { - final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis); - List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate); - if (snapshots == null) { - snapshots = new ArrayList<>(); - snapshotsToAggregate.put(normalizedDate, snapshots); + Map<String, StatusSnapshot> nodeToSnapshotMap = dateToNodeSnapshots.get(normalizedDate); + if (nodeToSnapshotMap == null) { + nodeToSnapshotMap = new HashMap<>(); + dateToNodeSnapshots.put(normalizedDate, nodeToSnapshotMap); } - snapshots.add(snapshot); + nodeToSnapshotMap.put(nodeStatusSnapshot.getNodeId(), snapshot); } } - // Aggregate the snapshots - final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate); - - // get the details for this component from the last status history - final LinkedHashMap<String, String> clusterStatusHistoryDetails = new LinkedHashMap<>(); - clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails()); - - final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO(); - clusterStatusHistory.setGenerated(new Date()); - clusterStatusHistory.setDetails(clusterStatusHistoryDetails); - clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(processGroupDescriptors)); - clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos); - - final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO(); - history.setGenerated(new Date()); - history.setNodeStatusHistory(nodeHistories); - history.setClusterStatusHistory(clusterStatusHistory); - return history; - } - - public ClusterStatusHistoryDTO getRemoteProcessGroupStatusHistory(final String remoteGroupId) { - return getRemoteProcessGroupStatusHistory(remoteGroupId, null, null, Integer.MAX_VALUE); - } - - public ClusterStatusHistoryDTO getRemoteProcessGroupStatusHistory(final String remoteGroupId, final Date startDate, final Date endDate, final int preferredDataPoints) { - final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>(); - - StatusHistoryDTO lastStatusHistory = null; - final Set<MetricDescriptor<?>> remoteProcessGroupDescriptors = new LinkedHashSet<>(); + // aggregate the snapshots by (normalized) timestamp final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); - - for (final Node node : getRawNodes()) { - final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId()); - if (statusRepository == null) { - continue; - } - - final StatusHistory statusHistory = statusRepository.getRemoteProcessGroupStatusHistory(remoteGroupId, startDate, endDate, preferredDataPoints); - if (statusHistory == null) { - continue; - } - - final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory); - // record the status history (last) to get the componet details for use later - lastStatusHistory = statusHistoryDto; - remoteProcessGroupDescriptors.addAll(statusRepository.getRemoteProcessGroupMetricDescriptors()); - - final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO(); - nodeHistory.setStatusHistory(statusHistoryDto); - nodeHistory.setNode(createNodeDTO(node)); - nodeHistories.add(nodeHistory); - - // collect all of the snapshots to aggregate - for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) { - final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis); - List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate); - if (snapshots == null) { - snapshots = new ArrayList<>(); - snapshotsToAggregate.put(normalizedDate, snapshots); - } - snapshots.add(snapshot); - } + for (final Map.Entry<Date, Map<String, StatusSnapshot>> entry : dateToNodeSnapshots.entrySet()) { + final Date normalizedDate = entry.getKey(); + final Map<String, StatusSnapshot> nodeToSnapshot = entry.getValue(); + final List<StatusSnapshot> snapshotsForTimestamp = new ArrayList<>(nodeToSnapshot.values()); + snapshotsToAggregate.put(normalizedDate, snapshotsForTimestamp); } - // Aggregate the snapshots - final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate); + final List<StatusSnapshotDTO> aggregatedSnapshots = aggregate(snapshotsToAggregate); + return aggregatedSnapshots; + } - // get the details for this comp
<TRUNCATED>
