http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 0000000,4d5455f..d3688af mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@@ -1,0 -1,3628 +1,4187 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.cluster.manager.impl; + -import java.io.File; ++import java.io.ByteArrayInputStream; ++import java.io.ByteArrayOutputStream; + import java.io.IOException; + import java.io.OutputStream; + import java.io.Serializable; + import java.net.URI; -import java.net.URL; + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.Comparator; + import java.util.Date; + import java.util.HashMap; + import java.util.HashSet; + import java.util.LinkedHashMap; + import java.util.LinkedHashSet; + import java.util.List; + import java.util.ListIterator; + import java.util.Map; + import java.util.Queue; + import java.util.Set; + import java.util.Timer; + import java.util.TimerTask; + import java.util.TreeMap; + import java.util.UUID; + import java.util.concurrent.CompletionService; ++import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentLinkedQueue; ++import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.ExecutorCompletionService; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Executors; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.locks.Lock; + import java.util.concurrent.locks.ReentrantReadWriteLock; + import java.util.regex.Pattern; + + import javax.net.ssl.SSLContext; + import javax.ws.rs.HttpMethod; + import javax.ws.rs.WebApplicationException; + import javax.ws.rs.core.StreamingOutput; -import javax.xml.XMLConstants; + import javax.xml.parsers.DocumentBuilder; + import javax.xml.parsers.DocumentBuilderFactory; + import javax.xml.parsers.ParserConfigurationException; ++import javax.xml.transform.OutputKeys; ++import javax.xml.transform.Transformer; ++import javax.xml.transform.TransformerException; ++import javax.xml.transform.TransformerFactory; + import javax.xml.transform.dom.DOMSource; -import javax.xml.validation.Schema; -import javax.xml.validation.SchemaFactory; -import javax.xml.validation.Validator; ++import javax.xml.transform.stream.StreamResult; + + import org.apache.commons.lang3.StringUtils; + import org.apache.nifi.admin.service.AuditService; ++import org.apache.nifi.annotation.lifecycle.OnAdded; ++import org.apache.nifi.annotation.lifecycle.OnRemoved; + import org.apache.nifi.cluster.BulletinsPayload; + import org.apache.nifi.cluster.HeartbeatPayload; + import org.apache.nifi.cluster.context.ClusterContext; + import org.apache.nifi.cluster.context.ClusterContextImpl; + import org.apache.nifi.cluster.event.Event; + import org.apache.nifi.cluster.event.EventManager; + import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; + import org.apache.nifi.cluster.flow.ClusterDataFlow; + import org.apache.nifi.cluster.flow.DaoException; + import org.apache.nifi.cluster.flow.DataFlowManagementService; + import org.apache.nifi.cluster.flow.PersistedFlowState; + import org.apache.nifi.cluster.manager.HttpClusterManager; + import org.apache.nifi.cluster.manager.HttpRequestReplicator; + import org.apache.nifi.cluster.manager.HttpResponseMapper; + import org.apache.nifi.cluster.manager.NodeResponse; + import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException; + import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException; + import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; + import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException; + import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException; + import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException; + import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException; + import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException; + import org.apache.nifi.cluster.manager.exception.NoResponseFromNodesException; + import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException; + import org.apache.nifi.cluster.manager.exception.NodeReconnectionException; + import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException; + import org.apache.nifi.cluster.manager.exception.SafeModeMutableRequestException; + import org.apache.nifi.cluster.manager.exception.UnknownNodeException; + import org.apache.nifi.cluster.manager.exception.UriConstructionException; + import org.apache.nifi.cluster.node.Node; + import org.apache.nifi.cluster.node.Node.Status; + import org.apache.nifi.cluster.protocol.ConnectionRequest; + import org.apache.nifi.cluster.protocol.ConnectionResponse; + import org.apache.nifi.cluster.protocol.Heartbeat; + import org.apache.nifi.cluster.protocol.NodeBulletins; + import org.apache.nifi.cluster.protocol.NodeIdentifier; + import org.apache.nifi.cluster.protocol.ProtocolException; + import org.apache.nifi.cluster.protocol.ProtocolHandler; + import org.apache.nifi.cluster.protocol.StandardDataFlow; + import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener; + import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster; + import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; + import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; + import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; + import org.apache.nifi.cluster.protocol.message.DisconnectMessage; + import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; + import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; + import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; + import org.apache.nifi.cluster.protocol.message.ProtocolMessage; + import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; + import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; + import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.controller.ControllerService; + import org.apache.nifi.controller.Heartbeater; + import org.apache.nifi.controller.ReportingTaskNode; ++import org.apache.nifi.controller.StandardFlowSerializer; + import org.apache.nifi.controller.ValidationContextFactory; ++import org.apache.nifi.controller.exception.ProcessorLifeCycleException; + import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode; + import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; ++import org.apache.nifi.controller.reporting.ReportingTaskProvider; + import org.apache.nifi.controller.reporting.StandardReportingInitializationContext; ++import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent; + import org.apache.nifi.controller.scheduling.StandardProcessScheduler; + import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent; ++import org.apache.nifi.controller.service.ControllerServiceLoader; + import org.apache.nifi.controller.service.ControllerServiceNode; + import org.apache.nifi.controller.service.ControllerServiceProvider; + import org.apache.nifi.controller.service.StandardControllerServiceProvider; + import org.apache.nifi.controller.status.ProcessGroupStatus; + import org.apache.nifi.controller.status.RemoteProcessGroupStatus; + import org.apache.nifi.controller.status.history.ComponentStatusRepository; + import org.apache.nifi.controller.status.history.MetricDescriptor; + import org.apache.nifi.controller.status.history.StatusHistory; + import org.apache.nifi.controller.status.history.StatusHistoryUtil; + import org.apache.nifi.controller.status.history.StatusSnapshot; + import org.apache.nifi.diagnostics.GarbageCollection; + import org.apache.nifi.diagnostics.StorageUsage; + import org.apache.nifi.diagnostics.SystemDiagnostics; + import org.apache.nifi.encrypt.StringEncryptor; + import org.apache.nifi.engine.FlowEngine; + import org.apache.nifi.events.BulletinFactory; + import org.apache.nifi.events.VolatileBulletinRepository; + import org.apache.nifi.framework.security.util.SslContextFactory; + import org.apache.nifi.io.socket.multicast.DiscoverableService; ++import org.apache.nifi.logging.ComponentLog; + import org.apache.nifi.logging.NiFiLog; + import org.apache.nifi.nar.ExtensionManager; + import org.apache.nifi.nar.NarCloseable; + import org.apache.nifi.nar.NarThreadContextClassLoader; ++import org.apache.nifi.processor.SimpleProcessLogger; + import org.apache.nifi.processor.StandardValidationContextFactory; + import org.apache.nifi.remote.RemoteResourceManager; + import org.apache.nifi.remote.RemoteSiteListener; + import org.apache.nifi.remote.SocketRemoteSiteListener; + import org.apache.nifi.remote.cluster.ClusterNodeInformation; + import org.apache.nifi.remote.cluster.NodeInformation; + import org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol; + import org.apache.nifi.reporting.Bulletin; + import org.apache.nifi.reporting.BulletinRepository; + import org.apache.nifi.reporting.InitializationException; + import org.apache.nifi.reporting.ReportingInitializationContext; + import org.apache.nifi.reporting.ReportingTask; + import org.apache.nifi.reporting.Severity; + import org.apache.nifi.scheduling.SchedulingStrategy; + import org.apache.nifi.util.DomUtils; + import org.apache.nifi.util.FormatUtils; + import org.apache.nifi.util.NiFiProperties; ++import org.apache.nifi.util.ObjectHolder; ++import org.apache.nifi.util.ReflectionUtils; ++import org.apache.nifi.web.OptimisticLockingManager; + import org.apache.nifi.web.Revision; ++import org.apache.nifi.web.UpdateRevision; + import org.apache.nifi.web.api.dto.FlowSnippetDTO; + import org.apache.nifi.web.api.dto.NodeDTO; + import org.apache.nifi.web.api.dto.ProcessGroupDTO; + import org.apache.nifi.web.api.dto.ProcessorDTO; + import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; + import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; + import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; + import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; + import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; + import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO; + import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO; + import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO; + import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO; + import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; + import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; + import org.apache.nifi.web.api.entity.FlowSnippetEntity; + import org.apache.nifi.web.api.entity.ProcessGroupEntity; + import org.apache.nifi.web.api.entity.ProcessorEntity; + import org.apache.nifi.web.api.entity.ProcessorsEntity; + import org.apache.nifi.web.api.entity.ProvenanceEntity; + import org.apache.nifi.web.api.entity.ProvenanceEventEntity; + import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; + import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity; + import org.apache.nifi.web.util.WebUtils; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.w3c.dom.DOMException; + import org.w3c.dom.Document; + import org.w3c.dom.Element; + import org.w3c.dom.NodeList; + import org.xml.sax.SAXException; + import org.xml.sax.SAXParseException; + + import com.sun.jersey.api.client.ClientResponse; ++import org.apache.nifi.controller.service.ControllerServiceState; ++import org.apache.nifi.web.api.dto.ControllerServiceDTO; ++import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; ++import org.apache.nifi.web.api.dto.ReportingTaskDTO; ++import org.apache.nifi.web.api.entity.ControllerServiceEntity; ++import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; ++import org.apache.nifi.web.api.entity.ControllerServicesEntity; ++import org.apache.nifi.web.api.entity.ReportingTaskEntity; ++import org.apache.nifi.web.api.entity.ReportingTasksEntity; + + /** + * Provides a cluster manager implementation. The manager federates incoming + * HTTP client requests to the nodes' external API using the HTTP protocol. The + * manager also communicates with nodes using the nodes' internal socket + * protocol. + * + * The manager's socket address may broadcasted using multicast if a + * MulticastServiceBroadcaster instance is set on this instance. The manager + * instance must be started after setting the broadcaster. + * + * The manager may be configured with an EventManager for recording noteworthy + * lifecycle events (e.g., first heartbeat received, node status change). + * + * The start() and stop() methods must be called to initialize and stop the + * instance. + * + * @author unattributed + */ -public class WebClusterManager implements HttpClusterManager, ProtocolHandler, ControllerServiceProvider { ++public class WebClusterManager implements HttpClusterManager, ProtocolHandler, ControllerServiceProvider, ReportingTaskProvider { + + public static final String ROOT_GROUP_ID_ALIAS = "root"; + public static final String BULLETIN_CATEGORY = "Clustering"; + + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(WebClusterManager.class)); + private static final Logger heartbeatLogger = new NiFiLog(LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat")); + + /** + * The HTTP header to store a cluster context. An example of what may be + * stored in the context is a node's auditable actions in response to a + * cluster request. The cluster context is serialized using Java's + * serialization mechanism and hex encoded. + */ + public static final String CLUSTER_CONTEXT_HTTP_HEADER = "X-ClusterContext"; + + /** + * HTTP Header that stores a unique ID for each request that is replicated + * to the nodes. This is used for logging purposes so that request + * information, such as timing, can be correlated between the NCM and the + * nodes + */ + public static final String REQUEST_ID_HEADER = "X-RequestID"; + + /** + * The HTTP header that the NCM specifies to ask a node if they are able to + * process a given request. The value is always 150-NodeContinue. The node + * will respond with 150 CONTINUE if it is able to process the request, 417 + * EXPECTATION_FAILED otherwise. + */ + public static final String NCM_EXPECTS_HTTP_HEADER = "X-NcmExpects"; + public static final int NODE_CONTINUE_STATUS_CODE = 150; + + /** + * The HTTP header that the NCM specifies to indicate that a node should + * invalidate the specified user group. This is done to ensure that user + * cache is not stale when an administrator modifies a group through the UI. + */ + public static final String CLUSTER_INVALIDATE_USER_GROUP_HEADER = "X-ClusterInvalidateUserGroup"; + + /** + * The HTTP header that the NCM specifies to indicate that a node should + * invalidate the specified user. This is done to ensure that user cache is + * not stale when an administrator modifies a user through the UI. + */ + public static final String CLUSTER_INVALIDATE_USER_HEADER = "X-ClusterInvalidateUser"; + + /** + * The default number of seconds to respond to a connecting node if the + * manager cannot provide it with a current data flow. + */ + private static final int DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS = 5; + + public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = "org.apache.nifi.controller.status.history.VolatileComponentStatusRepository"; + + public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors"); + public static final Pattern PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}"); + + public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups"); + public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}"); + + public static final Pattern PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))"); + public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance"); + public static final Pattern FLOW_SNIPPET_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/snippet-instance"); + + public static final String PROVENANCE_URI = "/nifi-api/controller/provenance"; + public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}"); + public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+"); - ++ ++ public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node"; ++ public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}"); ++ public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/references"); ++ public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node"; ++ public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}"); ++ + private final NiFiProperties properties; + private final HttpRequestReplicator httpRequestReplicator; + private final HttpResponseMapper httpResponseMapper; + private final DataFlowManagementService dataFlowManagementService; + private final ClusterManagerProtocolSenderListener senderListener; ++ private final OptimisticLockingManager optimisticLockingManager; + private final StringEncryptor encryptor; + private final Queue<Heartbeat> pendingHeartbeats = new ConcurrentLinkedQueue<>(); + private final ReentrantReadWriteLock resourceRWLock = new ReentrantReadWriteLock(); + private final ClusterManagerLock readLock = new ClusterManagerLock(resourceRWLock.readLock(), "Read"); + private final ClusterManagerLock writeLock = new ClusterManagerLock(resourceRWLock.writeLock(), "Write"); + + private final Set<Node> nodes = new HashSet<>(); - private final Set<ReportingTaskNode> reportingTasks = new HashSet<>(); ++ private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>(); + + // null means the dataflow should be read from disk + private StandardDataFlow cachedDataFlow = null; + private NodeIdentifier primaryNodeId = null; - private Revision revision = new Revision(0L, ""); + private Timer heartbeatMonitor; + private Timer heartbeatProcessor; + private volatile ClusterServicesBroadcaster servicesBroadcaster = null; + private volatile EventManager eventManager = null; + private volatile ClusterNodeFirewall clusterFirewall = null; + private volatile AuditService auditService = null; + private volatile ControllerServiceProvider controllerServiceProvider = null; + + private final RemoteSiteListener remoteSiteListener; + private final Integer remoteInputPort; + private final Boolean remoteCommsSecure; + private final BulletinRepository bulletinRepository; + private final String instanceId; + private final FlowEngine reportingTaskEngine; + private final Map<NodeIdentifier, ComponentStatusRepository> componentMetricsRepositoryMap = new HashMap<>(); + private final StandardProcessScheduler processScheduler; + private final long componentStatusSnapshotMillis; + + public WebClusterManager(final HttpRequestReplicator httpRequestReplicator, final HttpResponseMapper httpResponseMapper, + final DataFlowManagementService dataFlowManagementService, final ClusterManagerProtocolSenderListener senderListener, - final NiFiProperties properties, final StringEncryptor encryptor) { ++ final NiFiProperties properties, final StringEncryptor encryptor, final OptimisticLockingManager optimisticLockingManager) { + + if (httpRequestReplicator == null) { + throw new IllegalArgumentException("HttpRequestReplicator may not be null."); + } else if (httpResponseMapper == null) { + throw new IllegalArgumentException("HttpResponseMapper may not be null."); + } else if (dataFlowManagementService == null) { + throw new IllegalArgumentException("DataFlowManagementService may not be null."); + } else if (senderListener == null) { + throw new IllegalArgumentException("ClusterManagerProtocolSenderListener may not be null."); + } else if (properties == null) { + throw new IllegalArgumentException("NiFiProperties may not be null."); + } + + // Ensure that our encryptor/decryptor is properly initialized + this.httpRequestReplicator = httpRequestReplicator; + this.httpResponseMapper = httpResponseMapper; + this.dataFlowManagementService = dataFlowManagementService; + this.properties = properties; - this.controllerServiceProvider = new StandardControllerServiceProvider(); + this.bulletinRepository = new VolatileBulletinRepository(); + this.instanceId = UUID.randomUUID().toString(); + this.senderListener = senderListener; + this.encryptor = encryptor; ++ this.optimisticLockingManager = optimisticLockingManager; + senderListener.addHandler(this); + senderListener.setBulletinRepository(bulletinRepository); + + final String snapshotFrequency = properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY); + long snapshotMillis; + try { + snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS); + } catch (final Exception e) { + snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS); + } + componentStatusSnapshotMillis = snapshotMillis; + + remoteInputPort = properties.getRemoteInputPort(); + if (remoteInputPort == null) { + remoteSiteListener = null; + remoteCommsSecure = null; + } else { + // Register the ClusterManagerServerProtocol as the appropriate resource for site-to-site Server Protocol + RemoteResourceManager.setServerProtocolImplementation(ClusterManagerServerProtocol.RESOURCE_NAME, ClusterManagerServerProtocol.class); + remoteCommsSecure = properties.isSiteToSiteSecure(); + if (remoteCommsSecure) { + final SSLContext sslContext = SslContextFactory.createSslContext(properties, false); + + if (sslContext == null) { + throw new IllegalStateException("NiFi Configured to allow Secure Site-to-Site communications but the Keystore/Truststore properties are not configured"); + } + + remoteSiteListener = new SocketRemoteSiteListener(remoteInputPort.intValue(), sslContext, this); + } else { + remoteSiteListener = new SocketRemoteSiteListener(remoteInputPort.intValue(), null, this); + } + } + + reportingTaskEngine = new FlowEngine(8, "Reporting Task Thread"); + + processScheduler = new StandardProcessScheduler(new Heartbeater() { + @Override + public void heartbeat() { + } + }, this, encryptor); ++ ++ // When we construct the scheduling agents, we can pass null for a lot of the arguments because we are only ++ // going to be scheduling Reporting Tasks. Otherwise, it would not be okay. + processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, new TimerDrivenSchedulingAgent(null, reportingTaskEngine, null, encryptor)); ++ processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor)); + processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10); + processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10); ++ ++ controllerServiceProvider = new StandardControllerServiceProvider(processScheduler); + } + + public void start() throws IOException { + writeLock.lock(); + try { + + if (isRunning()) { + throw new IllegalStateException("Instance is already started."); + } + + try { + // setup heartbeat monitoring + heartbeatMonitor = new Timer("Heartbeat Monitor", /* is daemon */ true); + heartbeatMonitor.scheduleAtFixedRate(new HeartbeatMonitoringTimerTask(), 0, getHeartbeatMonitoringIntervalSeconds() * 1000); + + heartbeatProcessor = new Timer("Process Pending Heartbeats", true); + final int processPendingHeartbeatDelay = 1000 * Math.max(1, getClusterProtocolHeartbeatSeconds() / 2); + heartbeatProcessor.schedule(new ProcessPendingHeartbeatsTask(), processPendingHeartbeatDelay, processPendingHeartbeatDelay); + + // start request replication service + httpRequestReplicator.start(); + + // start protocol service + senderListener.start(); + + // start flow management service + dataFlowManagementService.start(); + + if (remoteSiteListener != null) { + remoteSiteListener.start(); + } + + // load flow ++ final ClusterDataFlow clusterDataFlow; + if (dataFlowManagementService.isFlowCurrent()) { - final ClusterDataFlow clusterDataFlow = dataFlowManagementService.loadDataFlow(); ++ clusterDataFlow = dataFlowManagementService.loadDataFlow(); + cachedDataFlow = clusterDataFlow.getDataFlow(); + primaryNodeId = clusterDataFlow.getPrimaryNodeId(); + } else { + throw new IOException("Flow is not current."); + } + ++ final byte[] serializedServices = clusterDataFlow.getControllerServices(); ++ if ( serializedServices != null && serializedServices.length > 0 ) { ++ ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices), encryptor, bulletinRepository, properties.getAutoResumeState()); ++ } ++ + // start multicast broadcasting service, if configured + if (servicesBroadcaster != null) { + servicesBroadcaster.start(); + } + + // start in safe mode + executeSafeModeTask(); + + // Load and start running Reporting Tasks - final File taskFile = new File(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE)); - reportingTasks.addAll(loadReportingTasks(taskFile)); ++ final byte[] serializedReportingTasks = clusterDataFlow.getReportingTasks(); ++ if ( serializedReportingTasks != null && serializedReportingTasks.length > 0 ) { ++ loadReportingTasks(serializedReportingTasks); ++ } + } catch (final IOException ioe) { + logger.warn("Failed to initialize cluster services due to: " + ioe, ioe); + stop(); + throw ioe; + } + + } finally { + writeLock.unlock("START"); + } + } + + public void stop() throws IOException { + writeLock.lock(); + try { + + // returns true if any service is running + if (isRunning() == false) { + throw new IllegalArgumentException("Instance is already stopped."); + } + + boolean encounteredException = false; + + // stop the heartbeat monitoring + if (isHeartbeatMonitorRunning()) { + heartbeatMonitor.cancel(); + heartbeatMonitor = null; + } + + if (heartbeatProcessor != null) { + heartbeatProcessor.cancel(); + heartbeatProcessor = null; + } + + // stop the HTTP request replicator service + if (httpRequestReplicator.isRunning()) { + httpRequestReplicator.stop(); + } + + // stop the flow management service + if (dataFlowManagementService.isRunning()) { + dataFlowManagementService.stop(); + } + + if (remoteSiteListener != null) { + remoteSiteListener.stop(); + } + + // stop the protocol listener service + if (senderListener.isRunning()) { + try { + senderListener.stop(); + } catch (final IOException ioe) { + encounteredException = true; + logger.warn("Failed to shutdown protocol service due to: " + ioe, ioe); + } + } + + // stop the service broadcaster + if (isBroadcasting()) { + servicesBroadcaster.stop(); + } + + if ( processScheduler != null ) { + processScheduler.shutdown(); + } + + if (encounteredException) { + throw new IOException("Failed to shutdown Cluster Manager because one or more cluster services failed to shutdown. Check the logs for details."); + } + + } finally { + writeLock.unlock("STOP"); + } + } + + public boolean isRunning() { + readLock.lock(); + try { + return isHeartbeatMonitorRunning() + || httpRequestReplicator.isRunning() + || senderListener.isRunning() + || dataFlowManagementService.isRunning() + || isBroadcasting(); + } finally { + readLock.unlock("isRunning"); + } + } + + @Override + public boolean canHandle(ProtocolMessage msg) { + return MessageType.CONNECTION_REQUEST == msg.getType() + || MessageType.HEARTBEAT == msg.getType() + || MessageType.CONTROLLER_STARTUP_FAILURE == msg.getType() + || MessageType.BULLETINS == msg.getType() + || MessageType.RECONNECTION_FAILURE == msg.getType(); + } + + @Override + public ProtocolMessage handle(final ProtocolMessage protocolMessage) throws ProtocolException { + switch (protocolMessage.getType()) { + case CONNECTION_REQUEST: + return handleConnectionRequest((ConnectionRequestMessage) protocolMessage); + case HEARTBEAT: + final HeartbeatMessage heartbeatMessage = (HeartbeatMessage) protocolMessage; + + final Heartbeat original = heartbeatMessage.getHeartbeat(); + final NodeIdentifier originalNodeId = original.getNodeIdentifier(); + final Heartbeat heartbeatWithDn = new Heartbeat(addRequestorDn(originalNodeId, heartbeatMessage.getRequestorDN()), original.isPrimary(), original.isConnected(), original.getPayload()); + + handleHeartbeat(heartbeatWithDn); + return null; + case CONTROLLER_STARTUP_FAILURE: + new Thread(new Runnable() { + @Override + public void run() { + handleControllerStartupFailure((ControllerStartupFailureMessage) protocolMessage); + } + }, "Handle Controller Startup Failure Message from " + ((ControllerStartupFailureMessage) protocolMessage).getNodeId()).start(); + return null; + case RECONNECTION_FAILURE: + new Thread(new Runnable() { + @Override + public void run() { + handleReconnectionFailure((ReconnectionFailureMessage) protocolMessage); + } + }, "Handle Reconnection Failure Message from " + ((ReconnectionFailureMessage) protocolMessage).getNodeId()).start(); + return null; + case BULLETINS: + final NodeBulletinsMessage bulletinsMessage = (NodeBulletinsMessage) protocolMessage; + handleBulletins(bulletinsMessage.getBulletins()); + return null; + default: + throw new ProtocolException("No handler defined for message type: " + protocolMessage.getType()); + } + } + + /** + * Services connection requests. If the data flow management service is + * unable to provide a current copy of the data flow, then the returned + * connection response will indicate the node should try later. Otherwise, + * the connection response will contain the the flow and the node + * identifier. + * + * If this instance is configured with a firewall and the request is + * blocked, then the response will not contain a node identifier. + * + * @param request a connection request + * + * @return a connection response + */ + @Override + public ConnectionResponse requestConnection(final ConnectionRequest request) { + final boolean lockObtained = writeLock.tryLock(3, TimeUnit.SECONDS); + if (!lockObtained) { + // Create try-later response because we are too busy to service the request right now. We do not want + // to wait long because we want Node/NCM comms to be very responsive + final int tryAgainSeconds; + if (dataFlowManagementService.getRetrievalDelaySeconds() <= 0) { + tryAgainSeconds = DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS; + } else { + tryAgainSeconds = dataFlowManagementService.getRetrievalDelaySeconds(); + } + + // record event + final String msg = "Connection requested from node, but manager was too busy to service request. Instructing node to try again in " + tryAgainSeconds + " seconds."; + addEvent(request.getProposedNodeIdentifier(), msg); + addBulletin(request.getProposedNodeIdentifier(), Severity.INFO, msg); + + // return try later response + return new ConnectionResponse(tryAgainSeconds); + } + + try { + // resolve the proposed node identifier to a valid node identifier + final NodeIdentifier resolvedNodeIdentifier = resolveProposedNodeIdentifier(request.getProposedNodeIdentifier()); + + if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) { + // if the socket address is not listed in the firewall, then return a null response + logger.info("Firewall blocked connection request from node " + resolvedNodeIdentifier); + return ConnectionResponse.createBlockedByFirewallResponse(); + } + + // get a raw reference to the node (if it doesn't exist, node will be null) + Node node = getRawNode(resolvedNodeIdentifier.getId()); + + // create a new node if necessary and set status to connecting + if (node == null) { + node = new Node(resolvedNodeIdentifier, Status.CONNECTING); + addEvent(node.getNodeId(), "Connection requested from new node. Setting status to connecting."); + nodes.add(node); + } else { + node.setStatus(Status.CONNECTING); + addEvent(resolvedNodeIdentifier, "Connection requested from existing node. Setting status to connecting"); + } + + // record the time of the connection request + node.setConnectionRequestedTimestamp(new Date().getTime()); + + // clear out old heartbeat info + node.setHeartbeat(null); + + // try to obtain a current flow + if (dataFlowManagementService.isFlowCurrent()) { + // if a cached copy does not exist, load it from disk + if (cachedDataFlow == null) { + final ClusterDataFlow clusterDataFlow = dataFlowManagementService.loadDataFlow(); + cachedDataFlow = clusterDataFlow.getDataFlow(); + primaryNodeId = clusterDataFlow.getPrimaryNodeId(); + } + + // determine if this node should be assigned the primary role + final boolean primaryRole; + if (primaryNodeId == null || primaryNodeId.logicallyEquals(node.getNodeId())) { + setPrimaryNodeId(node.getNodeId()); + addEvent(node.getNodeId(), "Setting primary role in connection response."); + primaryRole = true; + } else { + primaryRole = false; + } + + return new ConnectionResponse(node.getNodeId(), cachedDataFlow, primaryRole, remoteInputPort, remoteCommsSecure, instanceId); + } + + /* + * The manager does not have a current copy of the data flow, + * so it will instruct the node to try connecting at a later + * time. Meanwhile, the flow will be locked down from user + * changes because the node is marked as connecting. + */ + + /* + * Create try-later response based on flow retrieval delay to give + * the flow management service a chance to retrieve a curren flow + */ + final int tryAgainSeconds; + if (dataFlowManagementService.getRetrievalDelaySeconds() <= 0) { + tryAgainSeconds = DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS; + } else { + tryAgainSeconds = dataFlowManagementService.getRetrievalDelaySeconds(); + } + + // record event + addEvent(node.getNodeId(), "Connection requested from node, but manager was unable to obtain current flow. Instructing node to try again in " + tryAgainSeconds + " seconds."); + + // return try later response + return new ConnectionResponse(tryAgainSeconds); + + } finally { + writeLock.unlock("requestConnection"); + } + } + + /** + * Services reconnection requests for a given node. If the node indicates + * reconnection failure, then the node will be set to disconnected and if + * the node has primary role, then the role will be revoked. Otherwise, a + * reconnection request will be sent to the node, initiating the connection + * handshake. + * + * @param nodeId a node identifier + * + * @throws UnknownNodeException if the node does not exist + * @throws IllegalNodeReconnectionException if the node cannot be + * reconnected because the node is not disconnected + * @throws NodeReconnectionException if the reconnection message failed to + * be sent or the cluster could not provide a current data flow for the + * reconnection request + */ + @Override + public void requestReconnection(final String nodeId, final String userDn) throws UnknownNodeException, IllegalNodeReconnectionException { + Node node = null; + + final boolean primaryRole; + final int tryAgainSeconds; + + writeLock.lock(); + try { + // check if we know about this node and that it is disconnected + node = getRawNode(nodeId); + logger.info("Request was made by {} to reconnect node {} to cluster", userDn, node == null ? nodeId : node); + + if (node == null) { + throw new UnknownNodeException("Node does not exist."); + } else if (Status.DISCONNECTED != node.getStatus()) { + throw new IllegalNodeReconnectionException("Node must be disconnected before it can reconnect."); + } + + // clear out old heartbeat info + node.setHeartbeat(null); + + // get the dataflow to send with the reconnection request + if (!dataFlowManagementService.isFlowCurrent()) { + /* node remains disconnected */ + final String msg = "Reconnection requested for node, but manager was unable to obtain current flow. Setting node to disconnected."; + addEvent(node.getNodeId(), msg); + addBulletin(node, Severity.WARNING, msg); + throw new NodeReconnectionException("Manager was unable to obtain current flow to provide in reconnection request to node. Try again in a few seconds."); + } + + // if a cached copy does not exist, load it from disk + if (cachedDataFlow == null) { + final ClusterDataFlow clusterDataFlow = dataFlowManagementService.loadDataFlow(); + cachedDataFlow = clusterDataFlow.getDataFlow(); + primaryNodeId = clusterDataFlow.getPrimaryNodeId(); + } + + node.setStatus(Status.CONNECTING); + addEvent(node.getNodeId(), "Reconnection requested for node. Setting status to connecting."); + + // determine if this node should be assigned the primary role + if (primaryNodeId == null || primaryNodeId.logicallyEquals(node.getNodeId())) { + setPrimaryNodeId(node.getNodeId()); + addEvent(node.getNodeId(), "Setting primary role in reconnection request."); + primaryRole = true; + } else { + primaryRole = false; + } + + if (dataFlowManagementService.getRetrievalDelaySeconds() <= 0) { + tryAgainSeconds = DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS; + } else { + tryAgainSeconds = dataFlowManagementService.getRetrievalDelaySeconds(); + } + } catch (final UnknownNodeException | IllegalNodeReconnectionException | NodeReconnectionException une) { + throw une; + } catch (final Exception ex) { + logger.warn("Problem encountered issuing reconnection request to node " + node.getNodeId() + " due to: " + ex, ex); + + node.setStatus(Status.DISCONNECTED); + final String eventMsg = "Problem encountered issuing reconnection request. Node will remain disconnected: " + ex; + addEvent(node.getNodeId(), eventMsg); + addBulletin(node, Severity.WARNING, eventMsg); + + // Exception thrown will include node ID but event/bulletin do not because the node/id is passed along with the message + throw new NodeReconnectionException("Problem encountered issuing reconnection request to " + node.getNodeId() + ". Node will remain disconnected: " + ex, ex); + } finally { + writeLock.unlock("requestReconnection"); + } + + // Asynchronously start attempting reconnection. This is not completely thread-safe, as + // we do this by releasing the write lock and then obtaining a read lock for each attempt, + // so we suffer from the ABA problem. However, we are willing to accept the consequences of + // this situation in order to avoid holding a lock for the entire duration. "The consequences" + // are that a second thread could potentially be doing the same thing, issuing a reconnection request. + // However, this is very unlikely to happen, based on the conditions under which we issue a reconnection + // request. And if we do, the node will simply reconnect multiple times, which is not a big deal. + requestReconnectionAsynchronously(node, primaryRole, 10, tryAgainSeconds); + } + + private void requestReconnectionAsynchronously(final Node node, final boolean primaryRole, final int reconnectionAttempts, final int retrySeconds) { + final Thread reconnectionThread = new Thread(new Runnable() { + @Override + public void run() { + for (int i = 0; i < reconnectionAttempts; i++) { + final ReconnectionRequestMessage request = new ReconnectionRequestMessage(); + + try { + readLock.lock(); + try { + if (Status.CONNECTING != node.getStatus()) { + // the node status has changed. It's no longer appropriate to attempt reconnection. + return; + } + + // create the request + request.setNodeId(node.getNodeId()); + request.setDataFlow(cachedDataFlow); + request.setPrimary(primaryRole); + request.setManagerRemoteSiteCommsSecure(remoteCommsSecure); + request.setManagerRemoteSiteListeningPort(remoteInputPort); + request.setInstanceId(instanceId); + } finally { + readLock.unlock("Reconnect " + node.getNodeId()); + } + + // Issue a reconnection request to the node. + senderListener.requestReconnection(request); + + node.setConnectionRequestedTimestamp(System.currentTimeMillis()); + + // successfully told node to reconnect -- we're done! + return; + } catch (final Exception e) { + logger.warn("Problem encountered issuing reconnection request to node " + node.getNodeId() + " due to: " + e); + if (logger.isDebugEnabled()) { + logger.warn("", e); + } + + addBulletin(node, Severity.WARNING, "Problem encountered issuing reconnection request to node " + node.getNodeId() + " due to: " + e); + } + + try { + Thread.sleep(1000L * retrySeconds); + } catch (final InterruptedException ie) { + break; + } + } + + // We failed to reconnect 10 times. We must now mark node as disconnected. + writeLock.lock(); + try { + if (Status.CONNECTING == node.getStatus()) { + requestDisconnectionQuietly(node.getNodeId(), "Failed to issue Reconnection Request " + reconnectionAttempts + " times"); + } + } finally { + writeLock.unlock("Mark node as Disconnected as a result of reconnection failure"); + } + } + }, "Reconnect " + node.getNodeId()); + + reconnectionThread.start(); + } + - private List<ReportingTaskNode> loadReportingTasks(final File taskConfigXml) { - final List<ReportingTaskNode> tasks = new ArrayList<>(); - if (taskConfigXml == null) { - logger.info("No controller tasks to start"); - return tasks; - } ++ private Map<String, ReportingTaskNode> loadReportingTasks(final byte[] serialized) { ++ final Map<String, ReportingTaskNode> tasks = new HashMap<>(); + + try { - final URL schemaUrl = getClass().getResource("/ReportingTaskConfiguration.xsd"); - final Document document = parse(taskConfigXml, schemaUrl); ++ final Document document = parse(serialized); + - final NodeList tasksNodes = document.getElementsByTagName("tasks"); ++ final NodeList tasksNodes = document.getElementsByTagName("reportingTasks"); + final Element tasksElement = (Element) tasksNodes.item(0); + + //optional properties for all ReportingTasks - for (final Element taskElement : DomUtils.getChildElementsByTagName(tasksElement, "task")) { ++ for (final Element taskElement : DomUtils.getChildElementsByTagName(tasksElement, "reportingTask")) { + //add global properties common to all tasks + Map<String, String> properties = new HashMap<>(); + + //get properties for the specific reporting task - id, name, class, + //and schedulingPeriod must be set + final String taskId = DomUtils.getChild(taskElement, "id").getTextContent().trim(); + final String taskName = DomUtils.getChild(taskElement, "name").getTextContent().trim(); + + final List<Element> schedulingStrategyNodeList = DomUtils.getChildElementsByTagName(taskElement, "schedulingStrategy"); + String schedulingStrategyValue = SchedulingStrategy.TIMER_DRIVEN.name(); + if (schedulingStrategyNodeList.size() == 1) { + final String specifiedValue = schedulingStrategyNodeList.get(0).getTextContent(); + + try { + schedulingStrategyValue = SchedulingStrategy.valueOf(specifiedValue).name(); + } catch (final Exception e) { + throw new RuntimeException("Cannot start Reporting Task with id " + taskId + " because its Scheduling Strategy does not have a valid value", e); + } + } + + final SchedulingStrategy schedulingStrategy = SchedulingStrategy.valueOf(schedulingStrategyValue); + final String taskSchedulingPeriod = DomUtils.getChild(taskElement, "schedulingPeriod").getTextContent().trim(); + final String taskClass = DomUtils.getChild(taskElement, "class").getTextContent().trim(); + + //optional task-specific properties + for (final Element optionalProperty : DomUtils.getChildElementsByTagName(taskElement, "property")) { + final String name = optionalProperty.getAttribute("name"); + final String value = optionalProperty.getTextContent().trim(); + properties.put(name, value); + } + + //set the class to be used for the configured reporting task + final ReportingTaskNode reportingTaskNode; + try { - reportingTaskNode = createReportingTask(taskClass, taskId); ++ reportingTaskNode = createReportingTask(taskClass, taskId, false); + } catch (final ReportingTaskInstantiationException e) { + logger.error("Unable to load reporting task {} due to {}", new Object[]{taskId, e}); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + continue; + } + + final ReportingTask reportingTask = reportingTaskNode.getReportingTask(); + - final ReportingInitializationContext config = new StandardReportingInitializationContext(taskId, taskName, schedulingStrategy, taskSchedulingPeriod, this); ++ final ComponentLog componentLog = new SimpleProcessLogger(taskId, reportingTask); ++ final ReportingInitializationContext config = new StandardReportingInitializationContext(taskId, taskName, ++ schedulingStrategy, taskSchedulingPeriod, componentLog, this); + reportingTask.initialize(config); + + final Map<PropertyDescriptor, String> resolvedProps; + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + resolvedProps = new HashMap<>(); + for (final Map.Entry<String, String> entry : properties.entrySet()) { + final PropertyDescriptor descriptor = reportingTask.getPropertyDescriptor(entry.getKey()); + resolvedProps.put(descriptor, entry.getValue()); + } + } + + for (final Map.Entry<PropertyDescriptor, String> entry : resolvedProps.entrySet()) { + reportingTaskNode.setProperty(entry.getKey().getName(), entry.getValue()); + } + + processScheduler.schedule(reportingTaskNode); - tasks.add(reportingTaskNode); ++ tasks.put(reportingTaskNode.getIdentifier(), reportingTaskNode); + } + } catch (final SAXException | ParserConfigurationException | IOException | DOMException | NumberFormatException | InitializationException t) { - logger.error("Unable to load reporting tasks from {} due to {}", new Object[]{taskConfigXml, t}); ++ logger.error("Unable to load reporting tasks due to {}", new Object[]{t}); + if (logger.isDebugEnabled()) { + logger.error("", t); + } + } + + return tasks; + } + - private ReportingTaskNode createReportingTask(final String type, final String id) throws ReportingTaskInstantiationException { ++ ++ @Override ++ public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException { + if (type == null) { + throw new NullPointerException(); + } + ReportingTask task = null; + final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); + try { + final ClassLoader detectedClassLoader = ExtensionManager.getClassLoader(type); + final Class<?> rawClass; + if (detectedClassLoader == null) { + rawClass = Class.forName(type); + } else { + rawClass = Class.forName(type, false, detectedClassLoader); + } + + Thread.currentThread().setContextClassLoader(detectedClassLoader); + final Class<? extends ReportingTask> reportingTaskClass = rawClass.asSubclass(ReportingTask.class); + final Object reportingTaskObj = reportingTaskClass.newInstance(); + task = reportingTaskClass.cast(reportingTaskObj); + } catch (final ClassNotFoundException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException t) { + throw new ReportingTaskInstantiationException(type, t); + } finally { + if (ctxClassLoader != null) { + Thread.currentThread().setContextClassLoader(ctxClassLoader); + } + } + + final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this); + final ReportingTaskNode taskNode = new ClusteredReportingTaskNode(task, id, processScheduler, + new ClusteredEventAccess(this), bulletinRepository, controllerServiceProvider, validationContextFactory); ++ taskNode.setName(task.getClass().getSimpleName()); ++ ++ reportingTasks.put(id, taskNode); ++ if ( firstTimeAdded ) { ++ try (final NarCloseable x = NarCloseable.withNarLoader()) { ++ ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task); ++ } catch (final Exception e) { ++ throw new ProcessorLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + task, e); ++ } ++ } ++ + return taskNode; + } + - private Document parse(final File xmlFile, final URL schemaUrl) throws SAXException, ParserConfigurationException, IOException { - final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); - final Schema schema = schemaFactory.newSchema(schemaUrl); ++ private Document parse(final byte[] serialized) throws SAXException, ParserConfigurationException, IOException { + final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); - docFactory.setSchema(schema); + final DocumentBuilder builder = docFactory.newDocumentBuilder(); + + builder.setErrorHandler(new org.xml.sax.ErrorHandler() { + @Override + public void fatalError(final SAXParseException err) throws SAXException { + logger.error("Config file line " + err.getLineNumber() + ", col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + err.getMessage()); + if (logger.isDebugEnabled()) { + logger.error("Error Stack Dump", err); + } + throw err; + } + + @Override + public void error(final SAXParseException err) throws SAXParseException { + logger.error("Config file line " + err.getLineNumber() + ", col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + err.getMessage()); + if (logger.isDebugEnabled()) { + logger.error("Error Stack Dump", err); + } + throw err; + } + + @Override + public void warning(final SAXParseException err) throws SAXParseException { + logger.warn(" Config file line " + err.getLineNumber() + ", uri " + err.getSystemId() + " : message : " + err.getMessage()); + if (logger.isDebugEnabled()) { + logger.warn("Warning stack dump", err); + } + throw err; + } + }); + + // build the docuemnt - final Document document = builder.parse(xmlFile); - - // ensure schema compliance - final Validator validator = schema.newValidator(); - validator.validate(new DOMSource(document)); - ++ final Document document = builder.parse(new ByteArrayInputStream(serialized)); + return document; + } + + private void addBulletin(final Node node, final Severity severity, final String msg) { + addBulletin(node.getNodeId(), severity, msg); + } + + private void addBulletin(final NodeIdentifier nodeId, final Severity severity, final String msg) { + bulletinRepository.addBulletin(BulletinFactory.createBulletin(BULLETIN_CATEGORY, severity.toString(), + nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + msg)); + } + + /** + * Services a disconnection request. + * + * @param nodeId a node identifier + * @param userDn the DN of the user requesting the disconnection + * + * @throws UnknownNodeException if the node does not exist + * @throws IllegalNodeDisconnectionException if the node cannot be + * disconnected due to the cluster's state (e.g., node is last connected + * node or node is primary) + * @throws NodeDisconnectionException if the disconnection message fails to + * be sent. + */ + @Override + public void requestDisconnection(final String nodeId, final String userDn) throws UnknownNodeException, IllegalNodeDisconnectionException, NodeDisconnectionException { + writeLock.lock(); + try { + // check that the node is known + final Node node = getNode(nodeId); + if (node == null) { + throw new UnknownNodeException("Node does not exist."); + } + requestDisconnection(node.getNodeId(), /* ignore last node */ false, "User " + userDn + " Disconnected Node"); + } finally { + writeLock.unlock("requestDisconnection(String)"); + } + } + + /** + * Requests a disconnection to the node with the given node ID, but any + * exception thrown is suppressed. + * + * @param nodeId the node ID + */ + private void requestDisconnectionQuietly(final NodeIdentifier nodeId, final String explanation) { + try { + requestDisconnection(nodeId, /* ignore node check */ true, explanation); + } catch (final IllegalNodeDisconnectionException | NodeDisconnectionException ex) { /* suppress exception */ } + } + + /** + * Issues a disconnection message to the node identified by the given node + * ID. If the node is not known, then a UnknownNodeException is thrown. If + * the node cannot be disconnected due to the cluster's state and + * ignoreLastNodeCheck is false, then a IllegalNodeDisconnectionException is + * thrown. Otherwise, a disconnection message is issued to the node. + * + * Whether the disconnection message is successfully sent to the node, the + * node is marked as disconnected and if the node is the primary node, then + * the primary role is revoked. + * + * @param nodeId the ID of the node + * @param ignoreNodeChecks if false, checks will be made to ensure the + * cluster supports the node's disconnection (e.g., the node is not the last + * connected node in the cluster; the node is not the primary); otherwise, + * the request is made regardless of the cluster state + * @param explanation + * + * @throws IllegalNodeDisconnectionException if the node cannot be + * disconnected due to the cluster's state (e.g., node is last connected + * node or node is primary). Not thrown if ignoreNodeChecks is true. + * @throws NodeDisconnectionException if the disconnection message fails to + * be sent. + */ + private void requestDisconnection(final NodeIdentifier nodeId, final boolean ignoreNodeChecks, final String explanation) + throws IllegalNodeDisconnectionException, NodeDisconnectionException { + + writeLock.lock(); + try { + + // check that the node is known + final Node node = getRawNode(nodeId.getId()); + if (node == null) { + if (ignoreNodeChecks) { + // issue the disconnection + final DisconnectMessage request = new DisconnectMessage(); + request.setNodeId(nodeId); + request.setExplanation(explanation); + + addEvent(nodeId, "Disconnection requested due to " + explanation); + senderListener.disconnect(request); + addEvent(nodeId, "Node disconnected due to " + explanation); + addBulletin(nodeId, Severity.INFO, "Node disconnected due to " + explanation); + return; + } else { + throw new UnknownNodeException("Node does not exist"); + } + } + + // if necessary, check that the node may be disconnected + if (!ignoreNodeChecks) { + final Set<NodeIdentifier> connectedNodes = getNodeIds(Status.CONNECTED); + // cannot disconnect the last connected node in the cluster + if (connectedNodes.size() == 1 && connectedNodes.iterator().next().equals(nodeId)) { + throw new IllegalNodeDisconnectionException("Node may not be disconnected because it is the only connected node in the cluster."); + } else if (isPrimaryNode(nodeId)) { + // cannot disconnect the primary node in the cluster + throw new IllegalNodeDisconnectionException("Node may not be disconnected because it is the primary node in the cluster."); + } + } + + // update status + node.setStatus(Status.DISCONNECTED); + notifyDataFlowManagementServiceOfNodeStatusChange(); + + // issue the disconnection + final DisconnectMessage request = new DisconnectMessage(); + request.setNodeId(nodeId); + request.setExplanation(explanation); + + addEvent(nodeId, "Disconnection requested due to " + explanation); + senderListener.disconnect(request); + addEvent(nodeId, "Node disconnected due to " + explanation); + addBulletin(node, Severity.INFO, "Node disconnected due to " + explanation); + } finally { + writeLock.unlock("requestDisconnection(NodeIdentifier, boolean)"); + } + } + + /** + * Messages the node to have the primary role. If the messaging fails, then + * the node is marked as disconnected. + * + * @param nodeId the node ID to assign primary role + * + * @return true if primary role assigned; false otherwise + */ + private boolean assignPrimaryRole(final NodeIdentifier nodeId) { + writeLock.lock(); + try { + // create primary role message + final PrimaryRoleAssignmentMessage msg = new PrimaryRoleAssignmentMessage(); + msg.setNodeId(nodeId); + msg.setPrimary(true); + logger.info("Attempting to assign primary role to node: " + nodeId); + + // message + senderListener.assignPrimaryRole(msg); + + logger.info("Assigned primary role to node: " + nodeId); + addBulletin(nodeId, Severity.INFO, "Node assigned primary role"); + + // true indicates primary role assigned + return true; + + } catch (final ProtocolException ex) { + + logger.warn("Failed attempt to assign primary role to node " + nodeId + " due to " + ex); + addBulletin(nodeId, Severity.ERROR, "Failed to assign primary role to node due to: " + ex); + + // mark node as disconnected and log/record the event + final Node node = getRawNode(nodeId.getId()); + node.setStatus(Status.DISCONNECTED); + addEvent(node.getNodeId(), "Disconnected because of failed attempt to assign primary role."); + + addBulletin(nodeId, Severity.WARNING, "Node disconnected because of failed attempt to assign primary role"); + + // false indicates primary role failed to be assigned + return false; + } finally { + writeLock.unlock("assignPrimaryRole"); + } + } + + /** + * Messages the node with the given node ID to no longer have the primary + * role. If the messaging fails, then the node is marked as disconnected. + * + * @return true if the primary role was revoked from the node; false + * otherwise + */ + private boolean revokePrimaryRole(final NodeIdentifier nodeId) { + writeLock.lock(); + try { + // create primary role message + final PrimaryRoleAssignmentMessage msg = new PrimaryRoleAssignmentMessage(); + msg.setNodeId(nodeId); + msg.setPrimary(false); + logger.info("Attempting to revoke primary role from node: " + nodeId); + + // send message + senderListener.assignPrimaryRole(msg); + + logger.info("Revoked primary role from node: " + nodeId); + addBulletin(nodeId, Severity.INFO, "Primary Role revoked from node"); + + // true indicates primary role was revoked + return true; + } catch (final ProtocolException ex) { + + logger.warn("Failed attempt to revoke primary role from node " + nodeId + " due to " + ex); + + // mark node as disconnected and log/record the event + final Node node = getRawNode(nodeId.getId()); + node.setStatus(Status.DISCONNECTED); + addEvent(node.getNodeId(), "Disconnected because of failed attempt to revoke primary role."); + addBulletin(node, Severity.ERROR, "Node disconnected because of failed attempt to revoke primary role"); + + // false indicates primary role failed to be revoked + return false; + } finally { + writeLock.unlock("revokePrimaryRole"); + } + } + + private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final String dn) { + return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), + nodeId.getApiPort(), nodeId.getSocketAddress(), nodeId.getSocketPort(), dn); + } + + private ConnectionResponseMessage handleConnectionRequest(final ConnectionRequestMessage requestMessage) { + final NodeIdentifier proposedIdentifier = requestMessage.getConnectionRequest().getProposedNodeIdentifier(); + final ConnectionRequest requestWithDn = new ConnectionRequest(addRequestorDn(proposedIdentifier, requestMessage.getRequestorDN())); + + final ConnectionResponse response = requestConnection(requestWithDn); + final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage(); + responseMessage.setConnectionResponse(response); + return responseMessage; + } + + private void handleControllerStartupFailure(final ControllerStartupFailureMessage msg) { + writeLock.lock(); + try { + final Node node = getRawNode(msg.getNodeId().getId()); + if (node != null) { + node.setStatus(Status.DISCONNECTED); + addEvent(msg.getNodeId(), "Node could not join cluster because it failed to start up properly. Setting node to Disconnected. Node reported the following error: " + msg.getExceptionMessage()); + addBulletin(node, Severity.ERROR, "Node could not join cluster because it failed to start up properly. Setting node to Disconnected. Node reported the following error: " + msg.getExceptionMessage()); + } + } finally { + writeLock.unlock("handleControllerStartupFailure"); + } + } + + private void handleReconnectionFailure(final ReconnectionFailureMessage msg) { + writeLock.lock(); + try { + final Node node = getRawNode(msg.getNodeId().getId()); + if (node != null) { + node.setStatus(Status.DISCONNECTED); + final String errorMsg = "Node could not rejoin cluster. Setting node to Disconnected. Node reported the following error: " + msg.getExceptionMessage(); + addEvent(msg.getNodeId(), errorMsg); + addBulletin(node, Severity.ERROR, errorMsg); + } + } finally { + writeLock.unlock("handleControllerStartupFailure"); + } + } - ++ ++ /** ++ * Adds an instance of a specified controller service. ++ * ++ * @param type ++ * @param id ++ * @param properties ++ * @return ++ */ ++ @Override ++ public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) { ++ return controllerServiceProvider.createControllerService(type, id, firstTimeAdded); ++ } + + @Override + public ControllerService getControllerService(String serviceIdentifier) { + return controllerServiceProvider.getControllerService(serviceIdentifier); + } + + @Override + public ControllerServiceNode getControllerServiceNode(final String id) { + return controllerServiceProvider.getControllerServiceNode(id); + } + + @Override + public boolean isControllerServiceEnabled(final ControllerService service) { + return controllerServiceProvider.isControllerServiceEnabled(service); + } + + @Override + public boolean isControllerServiceEnabled(final String serviceIdentifier) { + return controllerServiceProvider.isControllerServiceEnabled(serviceIdentifier); + } + + @Override - public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) { - return controllerServiceProvider.createControllerService(type, id, firstTimeAdded); ++ public String getControllerServiceName(final String serviceIdentifier) { ++ return controllerServiceProvider.getControllerServiceName(serviceIdentifier); + } - ++ + @Override + public void removeControllerService(final ControllerServiceNode serviceNode) { + controllerServiceProvider.removeControllerService(serviceNode); + } + + + @Override + public void enableControllerService(final ControllerServiceNode serviceNode) { + controllerServiceProvider.enableControllerService(serviceNode); + } + + @Override + public void disableControllerService(final ControllerServiceNode serviceNode) { + controllerServiceProvider.disableControllerService(serviceNode); + } + ++ @Override ++ public Set<ControllerServiceNode> getAllControllerServices() { ++ return controllerServiceProvider.getAllControllerServices(); ++ } ++ ++ ++ @Override ++ public void disableReferencingServices(final ControllerServiceNode serviceNode) { ++ controllerServiceProvider.disableReferencingServices(serviceNode); ++ } ++ ++ @Override ++ public void enableReferencingServices(final ControllerServiceNode serviceNode) { ++ controllerServiceProvider.enableReferencingServices(serviceNode); ++ } ++ ++ @Override ++ public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) { ++ controllerServiceProvider.scheduleReferencingComponents(serviceNode); ++ } ++ ++ @Override ++ public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) { ++ controllerServiceProvider.unscheduleReferencingComponents(serviceNode); ++ } ++ ++ @Override ++ public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) { ++ controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode); ++ } ++ ++ @Override ++ public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) { ++ controllerServiceProvider.verifyCanScheduleReferencingComponents(serviceNode); ++ } ++ ++ @Override ++ public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) { ++ controllerServiceProvider.verifyCanDisableReferencingServices(serviceNode); ++ } ++ ++ @Override ++ public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) { ++ controllerServiceProvider.verifyCanStopReferencingComponents(serviceNode); ++ } ++ ++ private byte[] serialize(final Document doc) throws TransformerException { ++ final ByteArrayOutputStream baos = new ByteArrayOutputStream(); ++ final DOMSource domSource = new DOMSource(doc); ++ final StreamResult streamResult = new StreamResult(baos); ++ ++ // configure the transformer and convert the DOM ++ final TransformerFactory transformFactory = TransformerFactory.newInstance(); ++ final Transformer transformer = transformFactory.newTransformer(); ++ transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2"); ++ transformer.setOutputProperty(OutputKeys.INDENT, "yes"); ++ ++ // transform the document to byte stream ++ transformer.transform(domSource, streamResult); ++ return baos.toByteArray(); ++ } ++ ++ private byte[] serializeControllerServices() throws ParserConfigurationException, TransformerException { ++ final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); ++ final DocumentBuilder docBuilder = docFactory.newDocumentBuilder(); ++ final Document document = docBuilder.newDocument(); ++ final Element rootElement = document.createElement("controllerServices"); ++ document.appendChild(rootElement); ++ ++ for ( final ControllerServiceNode serviceNode : getAllControllerServices() ) { ++ StandardFlowSerializer.addControllerService(rootElement, serviceNode, encryptor); ++ } ++ ++ return serialize(document); ++ } ++ ++ private byte[] serializeReportingTasks() throws ParserConfigurationException, TransformerException { ++ final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); ++ final DocumentBuilder docBuilder = docFactory.newDocumentBuilder(); ++ final Document document = docBuilder.newDocument(); ++ final Element rootElement = document.createElement("reportingTasks"); ++ document.appendChild(rootElement); ++ ++ for ( final ReportingTaskNode taskNode : getAllReportingTasks() ) { ++ StandardFlowSerializer.addReportingTask(rootElement, taskNode, encryptor); ++ } ++ ++ return serialize(document); ++ } ++ ++ ++ public void saveControllerServices() { ++ try { ++ dataFlowManagementService.updateControllerServices(serializeControllerServices()); ++ } catch (final Exception e) { ++ logger.error("Failed to save changes to NCM's Controller Services; changes may be lost on restart due to " + e); ++ if ( logger.isDebugEnabled() ) { ++ logger.error("", e); ++ } ++ ++ getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Controller Services", Severity.ERROR.name(), ++ "Failed to save changes to NCM's Controller Services; changes may be lost on restart. See logs for more details.")); ++ } ++ } ++ ++ public void saveReportingTasks() { ++ try { ++ dataFlowManagementService.updateReportingTasks(serializeReportingTasks()); ++ } catch (final Exception e) { ++ logger.error("Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart due to " + e); ++ if ( logger.isDebugEnabled() ) { ++ logger.error("", e); ++ } ++ ++ getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Reporting Tasks", Severity.ERROR.name(), ++ "Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart. See logs for more details.")); ++ } ++ } ++ ++ @Override ++ public Set<ReportingTaskNode> getAllReportingTasks() { ++ readLock.lock(); ++ try { ++ return new HashSet<>(reportingTasks.values()); ++ } finally { ++ readLock.unlock("getReportingTasks"); ++ } ++ } ++ ++ @Override ++ public ReportingTaskNode getReportingTaskNode(final String taskId) { ++ readLock.lock(); ++ try { ++ return reportingTasks.get(taskId); ++ } finally { ++ readLock.unlock("getReportingTaskNode"); ++ } ++ } ++ ++ @Override ++ public void startReportingTask(final ReportingTaskNode reportingTaskNode) { ++ reportingTaskNode.verifyCanStart(); ++ processScheduler.schedule(reportingTaskNode); ++ } ++ ++ ++ @Override ++ public void stopReportingTask(final ReportingTaskNode reportingTaskNode) { ++ reportingTaskNode.verifyCanStop(); ++ processScheduler.unschedule(reportingTaskNode); ++ } ++ ++ @Override ++ public void removeReportingTask(final ReportingTaskNode reportingTaskNode) { ++ writeLock.lock(); ++ try { ++ final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier()); ++ if ( existing == null || existing != reportingTaskNode ) { ++ throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow"); ++ } ++ ++ reportingTaskNode.verifyCanDelete(); ++ ++ try (final NarCloseable x = NarCloseable.withNarLoader()) { ++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext()); ++ } ++ ++ for ( final Map.Entry<PropertyDescriptor, String> entry : reportingTaskNode.getProperties().entrySet() ) { ++ final PropertyDescriptor descriptor = entry.getKey(); ++ if (descriptor.getControllerServiceDefinition() != null ) { ++ final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue(); ++ if ( value != null ) { ++ final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(value); ++ if ( serviceNode != null ) { ++ serviceNode.removeReference(reportingTaskNode); ++ } ++ } ++ } ++ } ++ ++ reportingTasks.remove(reportingTaskNode.getIdentifier()); ++ } finally { ++ writeLock.unlock("removeReportingTask"); ++ } ++ } ++ ++ ++ @Override ++ public void disableReportingTask(final ReportingTaskNode reportingTask) { ++ reportingTask.verifyCanDisable(); ++ processScheduler.disableReportingTask(reportingTask); ++ } ++ ++ @Override ++ public void enableReportingTask(final ReportingTaskNode reportingTask) { ++ reportingTask.verifyCanEnable(); ++ processScheduler.enableReportingTask(reportingTask); ++ } ++ + + /** + * Handle a bulletins message. + * + * @param bulletins + */ + public void handleBulletins(final NodeBulletins bulletins) { + final NodeIdentifier nodeIdentifier = bulletins.getNodeIdentifier(); + final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort(); + + // unmarshal the message + BulletinsPayload payload = BulletinsPayload.unmarshal(bulletins.getPayload()); + for (final Bulletin bulletin : payload.getBulletins()) { + bulletin.setNodeAddress(nodeAddress); + bulletinRepository.addBulletin(bulletin); + } + } + + /** + * Handles a node's heartbeat. If this heartbeat is a node's first heartbeat + * since its connection request, then the manager will mark the node as + * connected. If the node was previously disconnected due to a lack of + * heartbeat, then a reconnection request is issued. If the node was + * disconnected for other reasons, then a disconnection request is issued. + * If this instance is configured with a firewall and the heartbeat is + * blocked, then a disconnection request is issued. + * + * @param heartbeat + */ + @Override + public void handleHeartbeat(final Heartbeat heartbeat) { + // sanity check heartbeat + if (heartbeat == null) { + throw new IllegalArgumentException("Heartbeat may not be null."); + } else if (heartbeat.getNodeIdentifier() == null) { + throw new IllegalArgumentException("Heartbeat does not contain a node ID."); + } + + /* + * Processing a heartbeat requires a write lock, which may take a while + * to obtain. Only the last heartbeat is necessary to process per node. + * Futhermore, since many could pile up, heartbeats are processed in + * bulk. + * + * The below queue stores the pending heartbeats. + */ + pendingHeartbeats.add(heartbeat); + } + + private void processPendingHeartbeats() { + Node node; + + writeLock.lock(); + try { + /* + * Get the most recent heartbeats for the nodes in the cluster. This + * is achieved by "draining" the pending heartbeats queue, populating + * a map that associates a node identifier with its latest heartbeat, and + * finally, getting the values of the map. + */ + final Map<NodeIdentifier, Heartbeat> mostRecentHeartbeatsMap = new HashMap<>(); + Heartbeat aHeartbeat; + while ((aHeartbeat = pendingHeartbeats.poll()) != null) { + mostRecentHeartbeatsMap.put(aHeartbeat.getNodeIdentifier(), aHeartbeat); + } + final Collection<Heartbeat> mostRecentHeartbeats = new ArrayList<>(mostRecentHeartbeatsMap.values()); + + // return fast if no work to do + if (mostRecentHeartbeats.isEmpty()) { + return; + } + + logNodes("Before Heartbeat Processing", heartbeatLogger); + + final int numPendingHeartbeats = mostRecentHeartbeats.size(); + if (heartbeatLogger.isDebugEnabled()) { + heartbeatLogger.debug(String.format("Handling %s heartbeat%s", numPendingHeartbeats, (numPendingHeartbeats > 1) ? "s" : "")); + } + + for (final Heartbeat mostRecentHeartbeat : mostRecentHeartbeats) { + try { + // resolve the proposed node identifier to valid node identifier + final NodeIdentifier resolvedNodeIdentifier = resolveProposedNodeIdentifier(mostRecentHeartbeat.getNodeIdentifier()); + + // get a raw reference to the node (if it doesn't exist, node will be null) + node = getRawNode(resolvedNodeIdentifier.getId()); + + // if the node thinks it has the primary role, but the manager has assigned the role to a different node, then revoke the role + if (mostRecentHeartbeat.isPrimary() && !isPrimaryNode(resolvedNodeIdentifier)) { + addEvent(resolvedNodeIdentifier, "Heartbeat indicates node is running as primary node. Revoking primary role because primary role is assigned to a different node."); + revokePrimaryRole(resolvedNodeIdentifier); + } + + final boolean heartbeatIndicatesNotYetConnected = !mostRecentHeartbeat.isConnected(); + + if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) { + if (node == null) { + logger.info("Firewall blocked heartbeat received from unknown node " + resolvedNodeIdentifier + ". Issuing disconnection request."); + } else { + // record event + addEvent(resolvedNodeIdentifier, "Firewall blocked received heartbeat. Issuing disconnection request."); + } + + // request node
<TRUNCATED>
