http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index fbf400b..dbcb8fd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -19,23 +19,17 @@ package org.apache.nifi.cluster.manager.impl; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.OutputStream; import java.io.Serializable; import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.ListIterator; import java.util.Map; -import java.util.NavigableSet; import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; @@ -50,8 +44,6 @@ import java.util.regex.Pattern; import javax.net.ssl.SSLContext; import javax.ws.rs.HttpMethod; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.StreamingOutput; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; @@ -71,6 +63,12 @@ import org.apache.nifi.cluster.context.ClusterContext; import org.apache.nifi.cluster.context.ClusterContextImpl; import org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor; import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat; +import org.apache.nifi.cluster.coordination.http.HttpResponseMerger; +import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger; +import org.apache.nifi.cluster.coordination.http.replication.AsyncClusterResponse; +import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback; +import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; +import org.apache.nifi.cluster.coordination.http.replication.ThreadPoolRequestReplicator; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.event.Event; import org.apache.nifi.cluster.event.EventManager; @@ -80,10 +78,7 @@ import org.apache.nifi.cluster.flow.DaoException; import org.apache.nifi.cluster.flow.DataFlowManagementService; import org.apache.nifi.cluster.flow.PersistedFlowState; import org.apache.nifi.cluster.manager.HttpClusterManager; -import org.apache.nifi.cluster.manager.HttpRequestReplicator; -import org.apache.nifi.cluster.manager.HttpResponseMapper; import org.apache.nifi.cluster.manager.NodeResponse; -import org.apache.nifi.cluster.manager.StatusMerger; import org.apache.nifi.cluster.manager.exception.ConflictingNodeIdException; import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException; import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException; @@ -116,6 +111,7 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.StateManagerProvider; +import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; @@ -123,8 +119,6 @@ import org.apache.nifi.controller.StandardFlowSerializer; import org.apache.nifi.controller.StandardProcessorNode; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.exception.ComponentLifeCycleException; -import org.apache.nifi.controller.queue.DropFlowFileState; -import org.apache.nifi.controller.queue.ListFlowFileState; import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.reporting.ReportingTaskProvider; @@ -135,21 +129,12 @@ import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent; import org.apache.nifi.controller.service.ControllerServiceLoader; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; -import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.controller.service.StandardControllerServiceProvider; -import org.apache.nifi.controller.state.SortedStateUtils; import org.apache.nifi.controller.state.manager.StandardStateManagerProvider; -import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; -import org.apache.nifi.controller.status.history.MetricDescriptor; -import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor; -import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor; -import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor; -import org.apache.nifi.controller.status.history.StandardStatusSnapshot; -import org.apache.nifi.controller.status.history.StatusHistoryUtil; -import org.apache.nifi.controller.status.history.StatusSnapshot; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.events.BulletinFactory; +import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.VolatileBulletinRepository; import org.apache.nifi.framework.security.util.SslContextFactory; import org.apache.nifi.io.socket.multicast.DiscoverableService; @@ -171,9 +156,7 @@ import org.apache.nifi.remote.cluster.ClusterNodeInformation; import org.apache.nifi.remote.cluster.NodeInformation; import org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol; import org.apache.nifi.reporting.Bulletin; -import org.apache.nifi.reporting.BulletinQuery; import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingTask; @@ -187,74 +170,7 @@ import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.web.OptimisticLockingManager; import org.apache.nifi.web.Revision; import org.apache.nifi.web.UpdateRevision; -import org.apache.nifi.web.api.dto.BulletinBoardDTO; import org.apache.nifi.web.api.dto.BulletinDTO; -import org.apache.nifi.web.api.dto.ComponentStateDTO; -import org.apache.nifi.web.api.dto.ControllerServiceDTO; -import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; -import org.apache.nifi.web.api.dto.CountersDTO; -import org.apache.nifi.web.api.dto.DropRequestDTO; -import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; -import org.apache.nifi.web.api.dto.FlowSnippetDTO; -import org.apache.nifi.web.api.dto.ListingRequestDTO; -import org.apache.nifi.web.api.dto.NodeCountersSnapshotDTO; -import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsSnapshotDTO; -import org.apache.nifi.web.api.dto.ProcessGroupDTO; -import org.apache.nifi.web.api.dto.ProcessorDTO; -import org.apache.nifi.web.api.dto.QueueSizeDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; -import org.apache.nifi.web.api.dto.ReportingTaskDTO; -import org.apache.nifi.web.api.dto.StateEntryDTO; -import org.apache.nifi.web.api.dto.StateMapDTO; -import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO; -import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; -import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; -import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO; -import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO; -import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; -import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; -import org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO; -import org.apache.nifi.web.api.dto.status.NodePortStatusSnapshotDTO; -import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO; -import org.apache.nifi.web.api.dto.status.NodeProcessorStatusSnapshotDTO; -import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusSnapshotDTO; -import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO; -import org.apache.nifi.web.api.dto.status.PortStatusDTO; -import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO; -import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; -import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO; -import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; -import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; -import org.apache.nifi.web.api.entity.BulletinBoardEntity; -import org.apache.nifi.web.api.entity.ComponentStateEntity; -import org.apache.nifi.web.api.entity.ConnectionStatusEntity; -import org.apache.nifi.web.api.entity.ControllerServiceEntity; -import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; -import org.apache.nifi.web.api.entity.ControllerServicesEntity; -import org.apache.nifi.web.api.entity.ControllerStatusEntity; -import org.apache.nifi.web.api.entity.CountersEntity; -import org.apache.nifi.web.api.entity.DropRequestEntity; -import org.apache.nifi.web.api.entity.FlowSnippetEntity; -import org.apache.nifi.web.api.entity.ListingRequestEntity; -import org.apache.nifi.web.api.entity.PortStatusEntity; -import org.apache.nifi.web.api.entity.ProcessGroupEntity; -import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; -import org.apache.nifi.web.api.entity.ProcessorEntity; -import org.apache.nifi.web.api.entity.ProcessorStatusEntity; -import org.apache.nifi.web.api.entity.ProcessorsEntity; -import org.apache.nifi.web.api.entity.ProvenanceEntity; -import org.apache.nifi.web.api.entity.ProvenanceEventEntity; -import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; -import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity; -import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity; -import org.apache.nifi.web.api.entity.ReportingTaskEntity; -import org.apache.nifi.web.api.entity.ReportingTasksEntity; -import org.apache.nifi.web.api.entity.StatusHistoryEntity; -import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity; import org.apache.nifi.web.util.WebUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -265,7 +181,8 @@ import org.w3c.dom.NodeList; import org.xml.sax.SAXException; import org.xml.sax.SAXParseException; -import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.config.DefaultClientConfig; /** * Provides a cluster manager implementation. The manager federates incoming HTTP client requests to the nodes' external API using the HTTP protocol. The manager also communicates with nodes using the @@ -278,7 +195,7 @@ import com.sun.jersey.api.client.ClientResponse; * The start() and stop() methods must be called to initialize and stop the instance. * */ -public class WebClusterManager implements HttpClusterManager, ProtocolHandler, ControllerServiceProvider, ReportingTaskProvider { +public class WebClusterManager implements HttpClusterManager, ProtocolHandler, ControllerServiceProvider, ReportingTaskProvider, RequestCompletionCallback { public static final String ROOT_GROUP_ID_ALIAS = "root"; public static final String BULLETIN_CATEGORY = "Clustering"; @@ -322,58 +239,13 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C private static final int DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS = 5; - public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors"); - public static final Pattern PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/processors/[a-f0-9\\-]{36}"); - public static final Pattern PROCESSOR_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/flow/processors/[a-f0-9\\-]{36}/status"); - public static final Pattern PROCESSOR_STATE_URI_PATTERN = Pattern.compile("/nifi-api/processors/[a-f0-9\\-]{36}/state"); public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}"); - public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups"); - public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/remote-process-groups/[a-f0-9\\-]{36}"); - - public static final Pattern PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))"); - public static final Pattern GROUP_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/flow/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status"); - public static final Pattern CONTROLLER_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/flow/status"); - public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance"); - public static final Pattern FLOW_SNIPPET_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/snippet-instance"); - - public static final String PROVENANCE_URI = "/nifi-api/provenance"; - public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/provenance/[a-f0-9\\-]{36}"); - public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/provenance/events/[0-9]+"); - - public static final Pattern CONTROLLER_SERVICES_URI = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/controller-services/node"); - public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller-services/node/[a-f0-9\\-]{36}"); - public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller-services/node/[a-f0-9\\-]{36}/state"); - public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = Pattern.compile("/nifi-api/controller-services/node/[a-f0-9\\-]{36}/references"); - public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node"; - public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/reporting-tasks/node/[a-f0-9\\-]{36}"); - public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = Pattern.compile("/nifi-api/reporting-tasks/node/[a-f0-9\\-]{36}/state"); - public static final Pattern BULLETIN_BOARD_URI_PATTERN = Pattern.compile("/nifi-api/bulletin-board"); - public static final Pattern SYSTEM_DIAGNOSTICS_URI_PATTERN = Pattern.compile("/nifi-api/system-diagnostics"); - public static final Pattern COUNTERS_URI_PATTERN = Pattern.compile("/nifi-api/controller/counters"); - public static final Pattern COUNTER_URI_PATTERN = Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}"); - - public static final Pattern PROCESSOR_STATUS_HISTORY_URI_PATTERN = - Pattern.compile("/nifi-api/flow/processors/[a-f0-9\\-]{36}/status/history"); - public static final Pattern PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status/history"); - public static final Pattern REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern - .compile("/nifi-api/flow/remote-process-groups/[a-f0-9\\-]{36}/status/history"); - public static final Pattern CONNECTION_STATUS_HISTORY_URI_PATTERN = Pattern - .compile("/nifi-api/flow/connections/[a-f0-9\\-]{36}/status/history"); - public static final Pattern CONNECTION_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/flow/connections/[a-f0-9\\-]{36}/status"); - public static final Pattern INPUT_PORT_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/flow/input-ports/[a-f0-9\\-]{36}/status"); - public static final Pattern OUTPUT_PORT_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/flow/output-ports/[a-f0-9\\-]{36}/status"); - public static final Pattern REMOTE_PROCESS_GROUP_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/flow/remote-process-groups/[a-f0-9\\-]{36}/status"); - - public static final Pattern DROP_REQUESTS_URI = Pattern.compile("/nifi-api/connections/[a-f0-9\\-]{36}/drop-requests"); - public static final Pattern DROP_REQUEST_URI = Pattern.compile("/nifi-api/connections/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}"); - public static final Pattern LISTING_REQUESTS_URI = Pattern.compile("/nifi-api/connections/[a-f0-9\\-]{36}/listing-requests"); - public static final Pattern LISTING_REQUEST_URI = Pattern.compile("/nifi-api/connections/[a-f0-9\\-]{36}/listing-requests/[a-f0-9\\-]{36}"); + public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}"); + public static final Pattern COUNTER_URI_PATTERN = Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}"); private final NiFiProperties properties; - private final HttpRequestReplicator httpRequestReplicator; - private final HttpResponseMapper httpResponseMapper; private final DataFlowManagementService dataFlowManagementService; private final ClusterManagerProtocolSenderListener senderListener; private final OptimisticLockingManager optimisticLockingManager; @@ -404,18 +276,15 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C private final FlowEngine reportingTaskEngine; private final StandardProcessScheduler processScheduler; private final StateManagerProvider stateManagerProvider; - private final long componentStatusSnapshotMillis; + private final HttpResponseMerger responseMerger = new StandardHttpResponseMerger(this); + private final RequestReplicator httpRequestReplicator; - public WebClusterManager(final HttpRequestReplicator httpRequestReplicator, final HttpResponseMapper httpResponseMapper, + public WebClusterManager( final DataFlowManagementService dataFlowManagementService, final ClusterManagerProtocolSenderListener senderListener, final NiFiProperties properties, final StringEncryptor encryptor, final OptimisticLockingManager optimisticLockingManager) { - if (httpRequestReplicator == null) { - throw new IllegalArgumentException("HttpRequestReplicator may not be null."); - } else if (httpResponseMapper == null) { - throw new IllegalArgumentException("HttpResponseMapper may not be null."); - } else if (dataFlowManagementService == null) { + if (dataFlowManagementService == null) { throw new IllegalArgumentException("DataFlowManagementService may not be null."); } else if (senderListener == null) { throw new IllegalArgumentException("ClusterManagerProtocolSenderListener may not be null."); @@ -423,9 +292,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C throw new IllegalArgumentException("NiFiProperties may not be null."); } - // Ensure that our encryptor/decryptor is properly initialized - this.httpRequestReplicator = httpRequestReplicator; - this.httpResponseMapper = httpResponseMapper; this.dataFlowManagementService = dataFlowManagementService; this.properties = properties; this.bulletinRepository = new VolatileBulletinRepository(); @@ -436,15 +302,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C senderListener.addHandler(this); senderListener.setBulletinRepository(bulletinRepository); - final String snapshotFrequency = properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY); - long snapshotMillis; - try { - snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS); - } catch (final Exception e) { - snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS); - } - componentStatusSnapshotMillis = snapshotMillis; - remoteInputPort = properties.getRemoteInputPort(); if (remoteInputPort == null) { remoteSiteListener = null; @@ -485,9 +342,33 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider); - clusterCoordinator = new WebClusterManagerCoordinator(this, senderListener); + clusterCoordinator = new WebClusterManagerCoordinator(this, senderListener, dataFlowManagementService); heartbeatMonitor = new ClusterProtocolHeartbeatMonitor(clusterCoordinator, properties); senderListener.addHandler(heartbeatMonitor); + httpRequestReplicator = createRequestReplicator(properties); + } + + private RequestReplicator createRequestReplicator(final NiFiProperties properties) { + final int numThreads = properties.getClusterManagerNodeApiRequestThreads(); + final String connectionTimeout = properties.getClusterManagerNodeApiConnectionTimeout(); + final String readTimeout = properties.getClusterManagerNodeApiReadTimeout(); + final EventReporter eventReporter = createEventReporter(); + + final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(properties)); + return new ThreadPoolRequestReplicator(numThreads, jerseyClient, clusterCoordinator, connectionTimeout, readTimeout, this, + eventReporter, this, optimisticLockingManager, dataFlowManagementService); + } + + private EventReporter createEventReporter() { + return new EventReporter() { + private static final long serialVersionUID = 7770887158588031619L; + + @Override + public void reportEvent(Severity severity, String category, String message) { + final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message); + getBulletinRepository().addBulletin(bulletin); + } + }; } public void start() throws IOException { @@ -1335,23 +1216,23 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } @Override - public void disableReferencingServices(final ControllerServiceNode serviceNode) { - controllerServiceProvider.disableReferencingServices(serviceNode); + public Set<ConfiguredComponent> disableReferencingServices(final ControllerServiceNode serviceNode) { + return controllerServiceProvider.disableReferencingServices(serviceNode); } @Override - public void enableReferencingServices(final ControllerServiceNode serviceNode) { - controllerServiceProvider.enableReferencingServices(serviceNode); + public Set<ConfiguredComponent> enableReferencingServices(final ControllerServiceNode serviceNode) { + return controllerServiceProvider.enableReferencingServices(serviceNode); } @Override - public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) { - controllerServiceProvider.scheduleReferencingComponents(serviceNode); + public Set<ConfiguredComponent> scheduleReferencingComponents(final ControllerServiceNode serviceNode) { + return controllerServiceProvider.scheduleReferencingComponents(serviceNode); } @Override - public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) { - controllerServiceProvider.unscheduleReferencingComponents(serviceNode); + public Set<ConfiguredComponent> unscheduleReferencingComponents(final ControllerServiceNode serviceNode) { + return controllerServiceProvider.unscheduleReferencingComponents(serviceNode); } @Override @@ -1676,18 +1557,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C lock.lock(); try { - // check that the request can be applied - if (mutableRequest) { - if (isInSafeMode()) { - throw new SafeModeMutableRequestException("Received a mutable request [" + method + " -- " + uri + "] while in safe mode"); - } else if (!getNodeIds(Status.DISCONNECTED, Status.DISCONNECTING).isEmpty()) { - throw new DisconnectedNodeMutableRequestException("Received a mutable request [" + method + " -- " + uri + "] while a node is disconnected from the cluster"); - } else if (!getNodeIds(Status.CONNECTING).isEmpty()) { - // if any node is connecting and a request can change the flow, then we throw an exception - throw new ConnectingNodeMutableRequestException("Received a mutable request [" + method + " -- " + uri + "] while a node is trying to connect to the cluster"); - } - } - final NodeResponse clientResponse = federateRequest(method, uri, parameters, null, headers, nodeIdentifiers); if (clientResponse == null) { if (mutableRequest) { @@ -1913,55 +1782,20 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final String serializedClusterCtx = WebUtils.serializeObjectToHex(clusterCtx); updatedHeaders.put(CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterCtx); - // if the request is mutable, we need to verify that it is a valid request for all nodes in the cluster. - if (mutableRequest) { - updatedHeaders.put(NCM_EXPECTS_HTTP_HEADER, "150-NodeContinue"); - - final Set<NodeResponse> nodeResponses; - if (entity == null) { - nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, parameters, updatedHeaders); - } else { - nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, entity, updatedHeaders); - } - - updatedHeaders.remove(NCM_EXPECTS_HTTP_HEADER); - - for (final NodeResponse response : nodeResponses) { - if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) { - final String nodeDescription = response.getNodeId().getApiAddress() + ":" + response.getNodeId().getApiPort(); - final ClientResponse clientResponse = response.getClientResponse(); - if (clientResponse == null) { - throw new IllegalClusterStateException("Node " + nodeDescription + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus()); - } - final String nodeExplanation = clientResponse.getEntity(String.class); - throw new IllegalClusterStateException("Node " + nodeDescription + " is unable to fulfill this request due to: " + nodeExplanation, response.getThrowable()); - } - } - - // set flow state to unknown to denote a mutable request replication in progress - logger.debug("Setting Flow State to UNKNOWN due to mutable request to {} {}", method, uri); - notifyDataFlowManagmentServiceOfFlowStateChange(PersistedFlowState.UNKNOWN); - } - // replicate request - final Set<NodeResponse> nodeResponses; - try { - if (entity == null) { - nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, parameters, updatedHeaders); - } else { - nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, entity, updatedHeaders); - } - } catch (final UriConstructionException uce) { - // request was not replicated, so mark the flow with its original state - if (mutableRequest) { - notifyDataFlowManagmentServiceOfFlowStateChange(originalPersistedFlowState); - } + final AsyncClusterResponse clusterResponse = httpRequestReplicator.replicate(nodeIds, method, uri, entity == null ? parameters : entity, updatedHeaders); - throw uce; + final NodeResponse clientResponse; + try { + clientResponse = clusterResponse.awaitMergedResponse(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("Thread was interrupted while waiting for a response from one or more nodes", e); + final Set<NodeIdentifier> noResponses = clusterResponse.getNodesInvolved(); + noResponses.removeAll(clusterResponse.getCompletedNodeIdentifiers()); + throw new IllegalClusterStateException("Interrupted while waiting for a response from the following nodes: " + noResponses, e); } - // merge the response - final NodeResponse clientResponse = mergeResponses(uri, method, nodeResponses, mutableRequest); holder.set(clientResponse); // if we have a response get the updated cluster context for auditing and revision updating @@ -2009,1973 +1843,240 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return holder.get(); } - private static boolean isProcessorsEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && PROCESSORS_URI_PATTERN.matcher(uri.getPath()).matches(); - } - - private static boolean isProcessorEndpoint(final URI uri, final String method) { - if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) - && (PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches() || CLUSTER_PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches())) { - return true; - } else if ("POST".equalsIgnoreCase(method) && PROCESSORS_URI_PATTERN.matcher(uri.getPath()).matches()) { - return true; - } - return false; + private static boolean isCounterEndpoint(final String uriPath) { + return COUNTER_URI_PATTERN.matcher(uriPath).matches(); } - private static boolean isProcessorStatusEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && PROCESSOR_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); - } - private static boolean isProcessorStateEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && PROCESSOR_STATE_URI_PATTERN.matcher(uri.getPath()).matches(); - } + public List<BulletinDTO> mergeNCMBulletins(final List<BulletinDTO> nodeBulletins, final List<Bulletin> ncmBulletins) { + if (ncmBulletins == null || ncmBulletins.isEmpty()) { + return nodeBulletins; + } - private static boolean isProcessGroupEndpoint(final URI uri, final String method) { - return ("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches(); + final List<BulletinDTO> mergedBulletins = new ArrayList<>(nodeBulletins.size() + ncmBulletins.size()); + mergedBulletins.addAll(nodeBulletins); + mergedBulletins.addAll(createBulletinDtos(ncmBulletins)); + return mergedBulletins; } - private static boolean isConnectionStatusEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && CONNECTION_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); - } - private static boolean isInputPortStatusEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && INPUT_PORT_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + /** + * Creates BulletinDTOs for the specified Bulletins. + * + * @param bulletins bulletin + * @return dto + */ + public List<BulletinDTO> createBulletinDtos(final List<Bulletin> bulletins) { + final List<BulletinDTO> bulletinDtos = new ArrayList<>(bulletins.size()); + for (final Bulletin bulletin : bulletins) { + bulletinDtos.add(createBulletinDto(bulletin)); + } + return bulletinDtos; } - private static boolean isOutputPortStatusEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && OUTPUT_PORT_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + /** + * Creates a BulletinDTO for the specified Bulletin. + * + * @param bulletin bulletin + * @return dto + */ + public BulletinDTO createBulletinDto(final Bulletin bulletin) { + final BulletinDTO dto = new BulletinDTO(); + dto.setId(bulletin.getId()); + dto.setNodeAddress(bulletin.getNodeAddress()); + dto.setTimestamp(bulletin.getTimestamp()); + dto.setGroupId(bulletin.getGroupId()); + dto.setSourceId(bulletin.getSourceId()); + dto.setSourceName(bulletin.getSourceName()); + dto.setCategory(bulletin.getCategory()); + dto.setLevel(bulletin.getLevel()); + dto.setMessage(bulletin.getMessage()); + return dto; } - private static boolean isRemoteProcessGroupStatusEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUP_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); - } - private static boolean isGroupStatusEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && GROUP_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + /** + * Merges the validation errors into the specified map, recording the corresponding node identifier. + * + * @param validationErrorMap map + * @param nodeId id + * @param nodeValidationErrors errors + */ + public void mergeValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, final NodeIdentifier nodeId, final Collection<String> nodeValidationErrors) { + if (nodeValidationErrors != null) { + for (final String nodeValidationError : nodeValidationErrors) { + Set<NodeIdentifier> nodeSet = validationErrorMap.get(nodeValidationError); + if (nodeSet == null) { + nodeSet = new HashSet<>(); + validationErrorMap.put(nodeValidationError, nodeSet); + } + nodeSet.add(nodeId); + } + } } - private static boolean isControllerStatusEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && CONTROLLER_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); - } + /** + * Normalizes the validation errors by prepending the corresponding nodes when the error does not exist across all nodes. + * + * @param validationErrorMap map + * @param totalNodes total + * @return normalized errors + */ + public Set<String> normalizedMergedValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, int totalNodes) { + final Set<String> normalizedValidationErrors = new HashSet<>(); + for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : validationErrorMap.entrySet()) { + final String msg = validationEntry.getKey(); + final Set<NodeIdentifier> nodeIds = validationEntry.getValue(); - private static boolean isTemplateEndpoint(final URI uri, final String method) { - return "POST".equalsIgnoreCase(method) && TEMPLATE_INSTANCE_URI_PATTERN.matcher(uri.getPath()).matches(); + if (nodeIds.size() == totalNodes) { + normalizedValidationErrors.add(msg); + } else { + for (final NodeIdentifier nodeId : nodeIds) { + normalizedValidationErrors.add(nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + msg); + } + } + } + return normalizedValidationErrors; } - private static boolean isFlowSnippetEndpoint(final URI uri, final String method) { - return "POST".equalsIgnoreCase(method) && FLOW_SNIPPET_INSTANCE_URI_PATTERN.matcher(uri.getPath()).matches(); - } - private static boolean isRemoteProcessGroupsEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches(); - } + // TODO: This is temporary. Only here because while we have NCM, we must merge its bulletins. Once we get rid + // of the NCM, this goes away completely. + public static final Pattern CONTROLLER_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/status"); - private static boolean isProcessorStatusHistoryEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && PROCESSOR_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches(); + public boolean isControllerStatusEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && CONTROLLER_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); } - private static boolean isProcessGroupStatusHistoryEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches(); - } - private static boolean isRemoteProcessGroupStatusHistoryEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches(); - } + @Override + public void afterRequest(final String uriPath, final String method, final Set<NodeResponse> nodeResponses) { + final boolean mutableRequest = canChangeNodeState(method, null); - private static boolean isConnectionStatusHistoryEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && CONNECTION_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches(); - } + /* + * Nodes that encountered issues handling the request are marked as + * disconnected for mutable requests (e.g., post, put, delete). For + * other requests (e.g., get, head), the nodes remain in their current + * state even if they had problems handling the request. + */ + if (mutableRequest) { + final Set<NodeResponse> problematicNodeResponses = responseMerger.getProblematicNodeResponses(nodeResponses); - private static boolean isBulletinBoardEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && BULLETIN_BOARD_URI_PATTERN.matcher(uri.getPath()).matches(); - } + // all nodes failed + final boolean allNodesFailed = problematicNodeResponses.size() == nodeResponses.size(); - private static boolean isSystemDiagnosticsEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && SYSTEM_DIAGNOSTICS_URI_PATTERN.matcher(uri.getPath()).matches(); - } + // some nodes had a problematic response because of a missing counter, ensure the are not disconnected + final boolean someNodesFailedMissingCounter = !problematicNodeResponses.isEmpty() + && problematicNodeResponses.size() < nodeResponses.size() && isMissingCounter(problematicNodeResponses, uriPath); - private static boolean isCountersEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && COUNTERS_URI_PATTERN.matcher(uri.getPath()).matches(); - } + // ensure nodes stay connected in certain scenarios + if (allNodesFailed || someNodesFailedMissingCounter) { + for (final NodeResponse nodeResponse : nodeResponses) { + final Node node = getRawNode(nodeResponse.getNodeId().getId()); + if (problematicNodeResponses.contains(nodeResponse)) { + node.setStatus(Status.CONNECTED); + problematicNodeResponses.remove(nodeResponse); + } + } + } - private static boolean isRemoteProcessGroupEndpoint(final URI uri, final String method) { - if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REMOTE_PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches()) { - return true; - } else if ("POST".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches()) { - return true; + // disconnect problematic nodes + if (!problematicNodeResponses.isEmpty()) { + if (problematicNodeResponses.size() < nodeResponses.size()) { + logger.warn(String.format("The following nodes failed to process URI '%s'. Requesting each node to disconnect from cluster: ", uriPath, problematicNodeResponses)); + disconnectNodes(problematicNodeResponses, "Failed to process URI " + uriPath); + } else { + logger.warn("All nodes failed to process URI {}. As a result, no node will be disconnected from cluster", uriPath); + } + } } - - return false; } - private static boolean isProvenanceQueryEndpoint(final URI uri, final String method) { - if ("POST".equalsIgnoreCase(method) && PROVENANCE_URI.equals(uri.getPath())) { - return true; - } else if ("GET".equalsIgnoreCase(method) && PROVENANCE_QUERY_URI.matcher(uri.getPath()).matches()) { - return true; + + /** + * Determines if all problematic responses were due to 404 NOT_FOUND. Assumes that problematicNodeResponses is not empty and is not comprised of responses from all nodes in the cluster (at least + * one node contained the counter in question). + * + * @param problematicNodeResponses The problematic node responses + * @param uriPath The path of the URI for the request + * @return Whether all problematic node responses were due to a missing counter + */ + private boolean isMissingCounter(final Set<NodeResponse> problematicNodeResponses, final String uriPath) { + if (isCounterEndpoint(uriPath)) { + boolean notFound = true; + for (final NodeResponse problematicResponse : problematicNodeResponses) { + if (problematicResponse.getStatus() != 404) { + notFound = false; + break; + } + } + return notFound; } return false; } - private static boolean isProvenanceEventEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches(); - } - private static boolean isListFlowFilesEndpoint(final URI uri, final String method) { - if (("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && LISTING_REQUEST_URI.matcher(uri.getPath()).matches()) { - return true; - } else if ("POST".equalsIgnoreCase(method) && LISTING_REQUESTS_URI.matcher(uri.getPath()).matches()) { - return true; + /** + * A helper method to disconnect nodes that returned unsuccessful HTTP responses because of a replicated request. Disconnection requests are sent concurrently. + * + */ + private void disconnectNodes(final Set<NodeResponse> nodeResponses, final String explanation) { + // return fast if nothing to do + if (nodeResponses == null || nodeResponses.isEmpty()) { + return; } - return false; - } - - private static boolean isCounterEndpoint(final URI uri) { - return COUNTER_URI_PATTERN.matcher(uri.getPath()).matches(); - } + final ExecutorService executorService = Executors.newFixedThreadPool(properties.getClusterManagerProtocolThreads()); + final CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService); + for (final NodeResponse nodeResponse : nodeResponses) { + completionService.submit(new Runnable() { + @Override + public void run() { + final NodeIdentifier nodeId = nodeResponse.getNodeId(); + final int responseStatus = nodeResponse.getStatus(); + final URI requestUri = nodeResponse.getRequestUri(); + final StringBuilder msgBuilder = new StringBuilder(); + msgBuilder + .append("Requesting disconnection for node ") + .append(nodeId) + .append(" for request URI ") + .append(requestUri); + if (nodeResponse.hasThrowable()) { + msgBuilder.append(" because manager encountered exception when issuing request: ") + .append(nodeResponse.getThrowable()); + // log stack trace anytime we have a throwable + ((NiFiLog) logger).getWrappedLog().info(msgBuilder.toString(), nodeResponse.getThrowable()); + addEvent(nodeId, "Manager encountered exception when issuing request for URI " + requestUri); + addBulletin(nodeId, Severity.ERROR, "Manager encountered exception when issuing request for URI " + requestUri + "; node will be disconnected"); + } else { + msgBuilder.append(" because HTTP response status was ") + .append(responseStatus); + logger.info(msgBuilder.toString()); + addEvent(nodeId, "HTTP response status was unsuccessful (" + responseStatus + ") for request URI " + requestUri); + addBulletin(nodeId, Severity.ERROR, "HTTP response status was unsuccessful (" + responseStatus + ") for request URI " + requestUri); + } + requestDisconnectionQuietly(nodeId, explanation); + } + }, null); + } - private static boolean isControllerServicesEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.matcher(uri.getPath()).matches(); + executorService.shutdown(); } - private static boolean isControllerServiceEndpoint(final URI uri, final String method) { - if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_URI_PATTERN.matcher(uri.getPath()).matches()) { - return true; - } else if ("POST".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.matcher(uri.getPath()).matches()) { - return true; - } - - return false; - } - - private static boolean isControllerServiceStateEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && CONTROLLER_SERVICE_STATE_URI_PATTERN.matcher(uri.getPath()).matches(); - } - - private static boolean isControllerServiceReferenceEndpoint(final URI uri, final String method) { - if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_REFERENCES_URI_PATTERN.matcher(uri.getPath()).matches()) { - return true; - } - - return false; - } - - private static boolean isReportingTasksEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && REPORTING_TASKS_URI.equals(uri.getPath()); - } - - private static boolean isReportingTaskEndpoint(final URI uri, final String method) { - if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REPORTING_TASK_URI_PATTERN.matcher(uri.getPath()).matches()) { - return true; - } else if ("POST".equalsIgnoreCase(method) && REPORTING_TASKS_URI.equals(uri.getPath())) { - return true; - } - - return false; - } - - private static boolean isReportingTaskStateEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && REPORTING_TASK_STATE_URI_PATTERN.matcher(uri.getPath()).matches(); - } - - private static boolean isDropRequestEndpoint(final URI uri, final String method) { - if (("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && DROP_REQUEST_URI.matcher(uri.getPath()).matches()) { - return true; - } else if (("POST".equalsIgnoreCase(method) && DROP_REQUESTS_URI.matcher(uri.getPath()).matches())) { - return true; - } - - return false; - } - - static boolean isResponseInterpreted(final URI uri, final String method) { - return isProcessorsEndpoint(uri, method) || isProcessorEndpoint(uri, method) || isProcessorStateEndpoint(uri, method) - || isRemoteProcessGroupsEndpoint(uri, method) || isRemoteProcessGroupEndpoint(uri, method) - || isProcessGroupEndpoint(uri, method) - || isTemplateEndpoint(uri, method) || isFlowSnippetEndpoint(uri, method) - || isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method) - || isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) - || isControllerServiceReferenceEndpoint(uri, method) || isControllerServiceStateEndpoint(uri, method) - || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method) || isReportingTaskStateEndpoint(uri, method) - || isDropRequestEndpoint(uri, method) || isListFlowFilesEndpoint(uri, method) - || isGroupStatusEndpoint(uri, method) || isProcessorStatusEndpoint(uri, method) || isControllerStatusEndpoint(uri, method) - || isConnectionStatusEndpoint(uri, method) || isRemoteProcessGroupStatusEndpoint(uri, method) - || isInputPortStatusEndpoint(uri, method) || isOutputPortStatusEndpoint(uri, method) - || isProcessorStatusHistoryEndpoint(uri, method) || isProcessGroupStatusHistoryEndpoint(uri, method) - || isRemoteProcessGroupStatusHistoryEndpoint(uri, method) || isConnectionStatusHistoryEndpoint(uri, method) - || isBulletinBoardEndpoint(uri, method) || isSystemDiagnosticsEndpoint(uri, method) - || isCountersEndpoint(uri, method); - } - - private void mergeProcessorValidationErrors(final ProcessorDTO processor, Map<NodeIdentifier, ProcessorDTO> processorMap) { - final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); - - for (final Map.Entry<NodeIdentifier, ProcessorDTO> nodeEntry : processorMap.entrySet()) { - final NodeIdentifier nodeId = nodeEntry.getKey(); - final ProcessorDTO nodeProcessor = nodeEntry.getValue(); - - // merge the validation errors - mergeValidationErrors(validationErrorMap, nodeId, nodeProcessor.getValidationErrors()); - } - - // set the merged the validation errors - processor.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, processorMap.size())); - } - - private void mergeComponentState(final ComponentStateDTO componentState, Map<NodeIdentifier, ComponentStateDTO> componentStateMap) { - List<StateEntryDTO> localStateEntries = new ArrayList<>(); - - int totalStateEntries = 0; - for (final Map.Entry<NodeIdentifier, ComponentStateDTO> nodeEntry : componentStateMap.entrySet()) { - final ComponentStateDTO nodeComponentState = nodeEntry.getValue(); - final NodeIdentifier nodeId = nodeEntry.getKey(); - final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort(); - - final StateMapDTO nodeLocalStateMap = nodeComponentState.getLocalState(); - if (nodeLocalStateMap.getState() != null) { - totalStateEntries += nodeLocalStateMap.getTotalEntryCount(); - - for (final StateEntryDTO nodeStateEntry : nodeLocalStateMap.getState()) { - nodeStateEntry.setClusterNodeId(nodeId.getId()); - nodeStateEntry.setClusterNodeAddress(nodeAddress); - localStateEntries.add(nodeStateEntry); - } - } - } - - // ensure appropriate sort - Collections.sort(localStateEntries, SortedStateUtils.getEntryDtoComparator()); - - // sublist if necessary - if (localStateEntries.size() > SortedStateUtils.MAX_COMPONENT_STATE_ENTRIES) { - localStateEntries = localStateEntries.subList(0, SortedStateUtils.MAX_COMPONENT_STATE_ENTRIES); - } - - // add all the local state entries - componentState.getLocalState().setTotalEntryCount(totalStateEntries); - componentState.getLocalState().setState(localStateEntries); - } - - - private void mergeSystemDiagnostics(final SystemDiagnosticsDTO target, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, SystemDiagnosticsDTO> resultMap) { - final SystemDiagnosticsDTO mergedSystemDiagnostics = target; - mergedSystemDiagnostics.setNodeSnapshots(new ArrayList<NodeSystemDiagnosticsSnapshotDTO>()); - - final NodeSystemDiagnosticsSnapshotDTO selectedNodeSnapshot = new NodeSystemDiagnosticsSnapshotDTO(); - selectedNodeSnapshot.setSnapshot(target.getAggregateSnapshot().clone()); - selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); - selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); - selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); - - mergedSystemDiagnostics.getNodeSnapshots().add(selectedNodeSnapshot); - - for (final Map.Entry<NodeIdentifier, SystemDiagnosticsDTO> entry : resultMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); - final SystemDiagnosticsDTO toMerge = entry.getValue(); - if (toMerge == target) { - continue; - } - - StatusMerger.merge(mergedSystemDiagnostics, toMerge, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); - } - } - - private void mergeCounters(final CountersDTO target, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, CountersDTO> resultMap) { - final CountersDTO mergedCounters = target; - mergedCounters.setNodeSnapshots(new ArrayList<NodeCountersSnapshotDTO>()); - - final NodeCountersSnapshotDTO selectedNodeSnapshot = new NodeCountersSnapshotDTO(); - selectedNodeSnapshot.setSnapshot(target.getAggregateSnapshot().clone()); - selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); - selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); - selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); - - mergedCounters.getNodeSnapshots().add(selectedNodeSnapshot); - - for (final Map.Entry<NodeIdentifier, CountersDTO> entry : resultMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); - final CountersDTO toMerge = entry.getValue(); - if (toMerge == target) { - continue; - } - - StatusMerger.merge(mergedCounters, toMerge, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); - } - } - - private void mergeGroupStatus(final ProcessGroupStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, ProcessGroupStatusDTO> resultMap) { - final ProcessGroupStatusDTO mergedProcessGroupStatus = statusDto; - mergedProcessGroupStatus.setNodeSnapshots(new ArrayList<NodeProcessGroupStatusSnapshotDTO>()); - - final NodeProcessGroupStatusSnapshotDTO selectedNodeSnapshot = new NodeProcessGroupStatusSnapshotDTO(); - selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone()); - selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); - selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); - selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); - - mergedProcessGroupStatus.getNodeSnapshots().add(selectedNodeSnapshot); - - for (final Map.Entry<NodeIdentifier, ProcessGroupStatusDTO> entry : resultMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); - final ProcessGroupStatusDTO nodeProcessGroupStatus = entry.getValue(); - if (nodeProcessGroupStatus == mergedProcessGroupStatus) { - continue; - } - - final ProcessGroupStatusSnapshotDTO nodeSnapshot = nodeProcessGroupStatus.getAggregateSnapshot(); - for (final RemoteProcessGroupStatusSnapshotDTO remoteProcessGroupStatus : nodeSnapshot.getRemoteProcessGroupStatusSnapshots()) { - final List<String> nodeAuthorizationIssues = remoteProcessGroupStatus.getAuthorizationIssues(); - if (!nodeAuthorizationIssues.isEmpty()) { - for (final ListIterator<String> iter = nodeAuthorizationIssues.listIterator(); iter.hasNext();) { - final String Issue = iter.next(); - iter.set("[" + nodeId.getApiAddress() + ":" + nodeId.getApiPort() + "] -- " + Issue); - } - remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues); - } - } - - StatusMerger.merge(mergedProcessGroupStatus, nodeProcessGroupStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); - } - } - - - private void mergeProcessorStatus(final ProcessorStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, ProcessorStatusDTO> resultMap) { - final ProcessorStatusDTO mergedProcessorStatus = statusDto; - mergedProcessorStatus.setNodeSnapshots(new ArrayList<NodeProcessorStatusSnapshotDTO>()); - - final NodeProcessorStatusSnapshotDTO selectedNodeSnapshot = new NodeProcessorStatusSnapshotDTO(); - selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone()); - selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); - selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); - selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); - - mergedProcessorStatus.getNodeSnapshots().add(selectedNodeSnapshot); - - // merge the other nodes - for (final Map.Entry<NodeIdentifier, ProcessorStatusDTO> entry : resultMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); - final ProcessorStatusDTO nodeProcessorStatus = entry.getValue(); - if (nodeProcessorStatus == statusDto) { - continue; - } - - StatusMerger.merge(mergedProcessorStatus, nodeProcessorStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); - } - } - - private void mergeConnectionStatus(final ConnectionStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, ConnectionStatusDTO> resultMap) { - final ConnectionStatusDTO mergedConnectionStatus = statusDto; - mergedConnectionStatus.setNodeSnapshots(new ArrayList<NodeConnectionStatusSnapshotDTO>()); - - final NodeConnectionStatusSnapshotDTO selectedNodeSnapshot = new NodeConnectionStatusSnapshotDTO(); - selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone()); - selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); - selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); - selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); - - mergedConnectionStatus.getNodeSnapshots().add(selectedNodeSnapshot); - - // merge the other nodes - for (final Map.Entry<NodeIdentifier, ConnectionStatusDTO> entry : resultMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); - final ConnectionStatusDTO nodeConnectionStatus = entry.getValue(); - if (nodeConnectionStatus == statusDto) { - continue; - } - - StatusMerger.merge(mergedConnectionStatus, nodeConnectionStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); - } - } - - private void mergePortStatus(final PortStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, PortStatusDTO> resultMap) { - final PortStatusDTO mergedPortStatus = statusDto; - mergedPortStatus.setNodeSnapshots(new ArrayList<NodePortStatusSnapshotDTO>()); - - final NodePortStatusSnapshotDTO selectedNodeSnapshot = new NodePortStatusSnapshotDTO(); - selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone()); - selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); - selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); - selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); - - mergedPortStatus.getNodeSnapshots().add(selectedNodeSnapshot); - - // merge the other nodes - for (final Map.Entry<NodeIdentifier, PortStatusDTO> entry : resultMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); - final PortStatusDTO nodePortStatus = entry.getValue(); - if (nodePortStatus == statusDto) { - continue; - } - - StatusMerger.merge(mergedPortStatus, nodePortStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); - } - } - - private void mergeRemoteProcessGroupStatus(final RemoteProcessGroupStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, RemoteProcessGroupStatusDTO> resultMap) { - final RemoteProcessGroupStatusDTO mergedRemoteProcessGroupStatus = statusDto; - mergedRemoteProcessGroupStatus.setNodeSnapshots(new ArrayList<NodeRemoteProcessGroupStatusSnapshotDTO>()); - - final NodeRemoteProcessGroupStatusSnapshotDTO selectedNodeSnapshot = new NodeRemoteProcessGroupStatusSnapshotDTO(); - selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone()); - selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); - selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); - selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); - - mergedRemoteProcessGroupStatus.getNodeSnapshots().add(selectedNodeSnapshot); - - // merge the other nodes - for (final Map.Entry<NodeIdentifier, RemoteProcessGroupStatusDTO> entry : resultMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); - final RemoteProcessGroupStatusDTO nodeRemoteProcessGroupStatus = entry.getValue(); - if (nodeRemoteProcessGroupStatus == statusDto) { - continue; - } - - StatusMerger.merge(mergedRemoteProcessGroupStatus, nodeRemoteProcessGroupStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); - } - } - - private void mergeControllerStatus(final ControllerStatusDTO statusDto, final Map<NodeIdentifier, ControllerStatusDTO> resultMap) { - ControllerStatusDTO mergedStatus = statusDto; - for (final Map.Entry<NodeIdentifier, ControllerStatusDTO> entry : resultMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); - final ControllerStatusDTO nodeStatus = entry.getValue(); - - final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort(); - for (final BulletinDTO bulletin : nodeStatus.getBulletins()) { - bulletin.setNodeAddress(nodeAddress); - } - for (final BulletinDTO bulletin : nodeStatus.getControllerServiceBulletins()) { - bulletin.setNodeAddress(nodeAddress); - } - for (final BulletinDTO bulletin : nodeStatus.getReportingTaskBulletins()) { - bulletin.setNodeAddress(nodeAddress); - } - - if (nodeStatus == mergedStatus) { - continue; - } - - StatusMerger.merge(mergedStatus, nodeStatus); - } - - final int totalNodeCount = getNodeIds().size(); - final int connectedNodeCount = getNodeIds(Status.CONNECTED).size(); - - final List<Bulletin> ncmControllerBulletins = getBulletinRepository().findBulletinsForController(); - mergedStatus.setBulletins(mergeNCMBulletins(mergedStatus.getBulletins(), ncmControllerBulletins)); - - // get the controller service bulletins - final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build(); - final List<Bulletin> ncmServiceBulletins = getBulletinRepository().findBulletins(controllerServiceQuery); - mergedStatus.setControllerServiceBulletins(mergeNCMBulletins(mergedStatus.getControllerServiceBulletins(), ncmServiceBulletins)); - - // get the reporting task bulletins - final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build(); - final List<Bulletin> ncmReportingTaskBulletins = getBulletinRepository().findBulletins(reportingTaskQuery); - mergedStatus.setReportingTaskBulletins(mergeNCMBulletins(mergedStatus.getReportingTaskBulletins(), ncmReportingTaskBulletins)); - - mergedStatus.setConnectedNodeCount(connectedNodeCount); - mergedStatus.setTotalNodeCount(totalNodeCount); - StatusMerger.updatePrettyPrintedFields(mergedStatus); - } - - private List<BulletinDTO> mergeNCMBulletins(final List<BulletinDTO> nodeBulletins, final List<Bulletin> ncmBulletins) { - if (ncmBulletins == null || ncmBulletins.isEmpty()) { - return nodeBulletins; - } - - final List<BulletinDTO> mergedBulletins = new ArrayList<>(nodeBulletins.size() + ncmBulletins.size()); - mergedBulletins.addAll(nodeBulletins); - mergedBulletins.addAll(createBulletinDtos(ncmBulletins)); - return mergedBulletins; - } - - private void mergeBulletinBoard(final BulletinBoardDTO nodeBulletinBoard, final Map<NodeIdentifier, BulletinBoardDTO> resultMap) { - final List<BulletinDTO> bulletinDtos = new ArrayList<>(); - for (final Map.Entry<NodeIdentifier, BulletinBoardDTO> entry : resultMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); - final BulletinBoardDTO boardDto = entry.getValue(); - final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort(); - - for (final BulletinDTO bulletin : boardDto.getBulletins()) { - bulletin.setNodeAddress(nodeAddress); - bulletinDtos.add(bulletin); - } - } - - Collections.sort(bulletinDtos, new Comparator<BulletinDTO>() { - @Override - public int compare(final BulletinDTO o1, final BulletinDTO o2) { - final int timeComparison = o1.getTimestamp().compareTo(o2.getTimestamp()); - if (timeComparison != 0) { - return timeComparison; - } - - return o1.getNodeAddress().compareTo(o2.getNodeAddress()); - } - }); - - nodeBulletinBoard.setBulletins(bulletinDtos); - } - - /** - * Creates BulletinDTOs for the specified Bulletins. - * - * @param bulletins bulletin - * @return dto - */ - public List<BulletinDTO> createBulletinDtos(final List<Bulletin> bulletins) { - final List<BulletinDTO> bulletinDtos = new ArrayList<>(bulletins.size()); - for (final Bulletin bulletin : bulletins) { - bulletinDtos.add(createBulletinDto(bulletin)); - } - return bulletinDtos; - } - - /** - * Creates a BulletinDTO for the specified Bulletin. - * - * @param bulletin bulletin - * @return dto - */ - public BulletinDTO createBulletinDto(final Bulletin bulletin) { - final BulletinDTO dto = new BulletinDTO(); - dto.setId(bulletin.getId()); - dto.setNodeAddress(bulletin.getNodeAddress()); - dto.setTimestamp(bulletin.getTimestamp()); - dto.setGroupId(bulletin.getGroupId()); - dto.setSourceId(bulletin.getSourceId()); - dto.setSourceName(bulletin.getSourceName()); - dto.setCategory(bulletin.getCategory()); - dto.setLevel(bulletin.getLevel()); - dto.setMessage(bulletin.getMessage()); - return dto; - } - - private void mergeProvenanceQueryResults(final ProvenanceDTO provenanceDto, final Map<NodeIdentifier, ProvenanceDTO> resultMap, final Set<NodeResponse> problematicResponses) { - final ProvenanceResultsDTO results = provenanceDto.getResults(); - final ProvenanceRequestDTO request = provenanceDto.getRequest(); - final List<ProvenanceEventDTO> allResults = new ArrayList<>(1024); - - final Set<String> errors = new HashSet<>(); - Date oldestEventDate = new Date(); - int percentageComplete = 0; - boolean finished = true; - - long totalRecords = 0; - for (final Map.Entry<NodeIdentifier, ProvenanceDTO> entry : resultMap.entrySet()) { - final NodeIdentifier nodeIdentifier = entry.getKey(); - final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort(); - - final ProvenanceDTO nodeDto = entry.getValue(); - final ProvenanceResultsDTO nodeResultDto = nodeDto.getResults(); - if (nodeResultDto != null && nodeResultDto.getProvenanceEvents() != null) { - // increment the total number of records - totalRecords += nodeResultDto.getTotalCount(); - - // populate the cluster identifier - for (final ProvenanceEventDTO eventDto : nodeResultDto.getProvenanceEvents()) { - eventDto.setClusterNodeId(nodeIdentifier.getId()); - eventDto.setClusterNodeAddress(nodeAddress); - // add node identifier to the event's id so that it is unique across cluster - eventDto.setId(nodeIdentifier.getId() + eventDto.getId()); - allResults.add(eventDto); - } - } - - if (nodeResultDto.getOldestEvent() != null && nodeResultDto.getOldestEvent().before(oldestEventDate)) { - oldestEventDate = nodeResultDto.getOldestEvent(); - } - - if (nodeResultDto.getErrors() != null) { - for (final String error : nodeResultDto.getErrors()) { - errors.add(nodeAddress + " -- " + error); - } - } - - percentageComplete += nodeDto.getPercentCompleted(); - if (!nodeDto.isFinished()) { - finished = false; - } - } - percentageComplete /= resultMap.size(); - - // consider any problematic responses as errors - for (final NodeResponse problematicResponse : problematicResponses) { - final NodeIdentifier problemNode = problematicResponse.getNodeId(); - final String problemNodeAddress = problemNode.getApiAddress() + ":" + problemNode.getApiPort(); - errors.add(String.format("%s -- Request did not complete successfully (Status code: %s)", problemNodeAddress, problematicResponse.getStatus())); - } - - // Since we get back up to the maximum number of results from each node, we need to sort those values and then - // grab only the first X number of them. We do a sort based on time, such that the newest are included. - // If 2 events have the same timestamp, we do a secondary sort based on Cluster Node Identifier. If those are - // equal, we perform a terciary sort based on the the event id - Collections.sort(allResults, new Comparator<ProvenanceEventDTO>() { - @Override - public int compare(final ProvenanceEventDTO o1, final ProvenanceEventDTO o2) { - final int eventTimeComparison = o1.getEventTime().compareTo(o2.getEventTime()); - if (eventTimeComparison != 0) { - return -eventTimeComparison; - } - - final String nodeId1 = o1.getClusterNodeId(); - final String nodeId2 = o2.getClusterNodeId(); - final int nodeIdComparison; - if (nodeId1 == null && nodeId2 == null) { - nodeIdComparison = 0; - } else if (nodeId1 == null) { - nodeIdComparison = 1; - } else if (nodeId2 == null) { - nodeIdComparison = -1; - } else { - nodeIdComparison = -nodeId1.compareTo(nodeId2); - } - - if (nodeIdComparison != 0) { - return nodeIdComparison; - } - - return -Long.compare(o1.getEventId(), o2.getEventId()); - } - }); - - final int maxResults = request.getMaxResults().intValue(); - final List<ProvenanceEventDTO> selectedResults; - if (allResults.size() < maxResults) { - selectedResults = allResults; - } else { - selectedResults = allResults.subList(0, maxResults); - } - - // include any errors - if (errors.size() > 0) { - results.setErrors(errors); - } - - results.setTotalCount(totalRecords); - results.setTotal(FormatUtils.formatCount(totalRecords)); - results.setProvenanceEvents(selectedResults); - results.setOldestEvent(oldestEventDate); - results.setGenerated(new Date()); - provenanceDto.setPercentCompleted(percentageComplete); - provenanceDto.setFinished(finished); - } - - private void mergeRemoteProcessGroup(final RemoteProcessGroupDTO remoteProcessGroup, final Map<NodeIdentifier, RemoteProcessGroupDTO> remoteProcessGroupMap) { - final RemoteProcessGroupContentsDTO remoteProcessGroupContents = remoteProcessGroup.getContents(); - - Boolean mergedIsTargetSecure = null; - final List<String> mergedAuthorizationIssues = new ArrayList<>(); - final Set<RemoteProcessGroupPortDTO> mergedInputPorts = new HashSet<>(); - final Set<RemoteProcessGroupPortDTO> mergedOutputPorts = new HashSet<>(); - - for (final Map.Entry<NodeIdentifier, RemoteProcessGroupDTO> nodeEntry : remoteProcessGroupMap.entrySet()) { - final NodeIdentifier nodeId = nodeEntry.getKey(); - final RemoteProcessGroupDTO nodeRemoteProcessGroupDto = nodeEntry.getValue(); - - // merge the issues - final List<String> nodeAuthorizationIssues = nodeRemoteProcessGroupDto.getAuthorizationIssues(); - if (nodeAuthorizationIssues != null && !nodeAuthorizationIssues.isEmpty()) { - for (final String nodeAuthorizationIssue : nodeAuthorizationIssues) { - mergedAuthorizationIssues.add(nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + nodeAuthorizationIssue); - } - } - - // use the first target secure flag since they will all be the same - final Boolean nodeIsTargetSecure = nodeRemoteProcessGroupDto.isTargetSecure(); - if (mergedIsTargetSecure == null) { - mergedIsTargetSecure = nodeIsTargetSecure; - } - - // merge the ports in the contents - final RemoteProcessGroupContentsDTO nodeRemoteProcessGroupContentsDto = nodeRemoteProcessGroupDto.getContents(); - if (remoteProcessGroupContents != null && nodeRemoteProcessGroupContentsDto != null) { - if (nodeRemoteProcessGroupContentsDto.getInputPorts() != null) { - mergedInputPorts.addAll(nodeRemoteProcessGroupContentsDto.getInputPorts()); - } - if (nodeRemoteProcessGroupContentsDto.getOutputPorts() != null) { - mergedOutputPorts.addAll(nodeRemoteProcessGroupContentsDto.getOutputPorts()); - } - } - } - - if (remoteProcessGroupContents != null) { - if (!mergedInputPorts.isEmpty()) { - remoteProcessGroupContents.setInputPorts(mergedInputPorts); - } - if (!mergedOutputPorts.isEmpty()) { - remoteProcessGroupContents.setOutputPorts(mergedOutputPorts); - } - } - - if (mergedIsTargetSecure != null) { - remoteProcessGroup.setTargetSecure(mergedIsTargetSecure); - } - - if (!mergedAuthorizationIssues.isEmpty()) { - remoteProcessGroup.setAuthorizationIssues(mergedAuthorizationIssues); - } - } - - private void mergeControllerServiceReferences( - final Set<ControllerServiceReferencingComponentDTO> referencingComponents, final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> referencingComponentMap) { - final Map<String, Integer> activeThreadCounts = new HashMap<>(); - final Map<String, String> states = new HashMap<>(); - for (final Map.Entry<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeEntry : referencingComponentMap.entrySet()) { - final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeEntry.getValue(); - - // go through all the nodes referencing components - if (nodeReferencingComponents != null) { - for (final ControllerServiceReferencingComponentDTO nodeReferencingComponent : nodeReferencingComponents) { - // handle active thread counts - if (nodeReferencingComponent.getActiveThreadCount() != null && nodeReferencingComponent.getActiveThreadCount() > 0) { - final Integer current = activeThreadCounts.get(nodeReferencingComponent.getId()); - if (current == null) { - activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount()); - } else { - activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount() + current); - } - } - - // handle controller service state - final String state = states.get(nodeReferencingComponent.getId()); - if (state == null) { - if (ControllerServiceState.DISABLING.name().equals(nodeReferencingComponent.getState())) { - states.put(nodeReferencingComponent.getId(), ControllerServiceState.DISABLING.name()); - } else if (ControllerServiceState.ENABLING.name().equals(nodeReferencingComponent.getState())) { - states.put(nodeReferencingComponent.getId(), ControllerServiceState.ENABLING.name()); - } - } - } - } - } - - // go through each referencing components - for (final ControllerServiceReferencingComponentDTO referencingComponent : referencingComponents) { - final Integer activeThreadCount = activeThreadCounts.get(referencingComponent.getId()); - if (activeThreadCount != null) { - referencingComponent.setActiveThreadCount(activeThreadCount); - } - - final String state = states.get(referencingComponent.getId()); - if (state != null) { - referencingComponent.setState(state); - } - } - } - - private void mergeControllerService(final ControllerServiceDTO controllerService, final Map<NodeIdentifier, ControllerServiceDTO> controllerServiceMap) { - final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); - final Set<ControllerServiceReferencingComponentDTO> referencingComponents = controllerService.getReferencingComponents(); - final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeReferencingComponentsMap = new HashMap<>(); - - String state = null; - for (final Map.Entry<NodeIdentifier, ControllerServiceDTO> nodeEntry : controllerServiceMap.entrySet()) { - final NodeIdentifier nodeId = nodeEntry.getKey(); - final ControllerServiceDTO nodeControllerService = nodeEntry.getValue(); - - if (state == null) { - if (ControllerServiceState.DISABLING.name().equals(nodeControllerService.getState())) { - state = ControllerServiceState.DISABLING.name(); - } else if (ControllerServiceState.ENABLING.name().equals(nodeControllerService.getState())) { - state = ControllerServiceState.ENABLING.name(); - } - } - - for (final ControllerServiceReferencingComponentDTO nodeReferencingComponents : nodeControllerService.getReferencingComponents()) { - nodeReferencingComponentsMap.put(nodeId, nodeReferencingComponents.getReferencingComponents()); - } - - // merge the validation errors - mergeValidationErrors(validationErrorMap, nodeId, nodeControllerService.getValidationErrors()); - } - - // merge the referencing components - mergeControllerServiceReferences(referencingComponents, nodeReferencingComponentsMap); - - // store the 'transition' state is applicable - if (state != null) { - controllerService.setState(state); - } - - // set the merged the validation errors - controllerService.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, controllerServiceMap.size())); - } - - private void mergeReportingTask(final ReportingTaskDTO reportingTask, final Map<NodeIdentifier, ReportingTaskDTO> reportingTaskMap) { - final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); - - int activeThreadCount = 0; - for (final Map.Entry<NodeIdentifier, ReportingTaskDTO> nodeEntry : reportingTaskMap.entrySet()) { - final NodeIdentifier nodeId = nodeEntry.getKey(); - final ReportingTaskDTO nodeReportingTask = nodeEntry.getValue(); - - if (nodeReportingTask.getActiveThreadCount() != null) { - activeThreadCount += nodeReportingTask.getActiveThreadCount(); - } - - // merge the validation errors - mergeValidationErrors(validationErrorMap, nodeId, nodeReportingTask.getValidationErrors()); - } - - // set the merged active thread counts - reportingTask.setActiveThreadCount(activeThreadCount); - - // set the merged the validation errors - reportingTask.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, reportingTaskMap.size())); - } - - /** - * Merges the validation errors into the specified map, recording the corresponding node identifier. - * - * @param validationErrorMap map - * @param nodeId id - * @param nodeValidationErrors errors - */ - public void mergeValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, final NodeIdentifier nodeId, final Collection<String> nodeValidationErrors) { - if (nodeValidationErrors != null) { - for (final String nodeValidationError : nodeValidationErrors) { - Set<NodeIdentifier> nodeSet = validationErrorMap.get(nodeValidationError); - if (nodeSet == null) { - nodeSet = new HashSet<>(); - validationErrorMap.put(nodeValidationError, nodeSet); - } - nodeSet.add(nodeId); - } - } - } - - /** - * Normalizes the validation errors by prepending the corresponding nodes when the error does not exist across all nodes. - * - * @param validationErrorMap map - * @param totalNodes total - * @return normalized errors - */ - public Set<String> normalizedMergedValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, int totalNodes) { - final Set<String> normalizedValidationErrors = new HashSet<>(); - for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : validationErrorMap.entrySet()) { - final String msg = validationEntry.getKey(); - final Set<NodeIdentifier> nodeIds = validationEntry.getValue(); - - if (nodeIds.size() == totalNodes) { - normalizedValidationErrors.add(msg); - } else { - for (final NodeIdentifier nodeId : nodeIds) { - normalizedValidationErrors.add(nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + msg); - } - } - } - return normalizedValidationErrors; - } - - - /** - * Merges the listing requests in the specified map into the specified listing request - * - * @param listingRequest the target listing request - * @param listingRequestMap the mapping of all responses being merged - */ - private void mergeListingRequests(final ListingRequestDTO listingRequest, final Map<NodeIdentifier, ListingRequestDTO> listingRequestMap) { - final Comparator<FlowFileSummaryDTO> comparator = new Comparator<FlowFileSummaryDTO>() { - @Override - public int compare(final FlowFileSummaryDTO dto1, final FlowFileSummaryDTO dto2) { - int positionCompare = dto1.getPosition().compareTo(dto2.getPosition()); - if (positionCompare != 0) { - return positionCompare; - } - - final String address1 = dto1.getClusterNodeAddress(); - final String address2 = dto2.getClusterNodeAddress(); - if (address1 == null && address2 == null) { - return 0; - } - if (address1 == null) { - return 1; - } - if (address2 == null) { - return -1; - } - return address1.compareTo(address2); - } - }; - - final NavigableSet<FlowFileSummaryDTO> flowFileSummaries = new TreeSet<>(comparator); - - ListFlowFileState state = null; - int numStepsCompleted = 0; - int numStepsTotal = 0; - int objectCount = 0; - long byteCount = 0; - boolean finished = true; - for (final Map.Entry<NodeIdentifier, ListingRequestDTO> entry : listingRequestMap.entrySet()) { - final NodeIdentifier nodeIdentifier = entry.getKey(); - final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort(); - - final ListingRequestDTO nodeRequest = entry.getValue(); - - numStepsTotal++; - if (Boolean.TRUE.equals(nodeRequest.getFinished())) { - numStepsCompleted++; - } - - final QueueSizeDTO nodeQueueSize = nodeRequest.getQueueSize(); -
<TRUNCATED>
