http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 0000000,4d5455f..d3688af
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@@ -1,0 -1,3628 +1,4187 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.cluster.manager.impl;
+ 
 -import java.io.File;
++import java.io.ByteArrayInputStream;
++import java.io.ByteArrayOutputStream;
+ import java.io.IOException;
+ import java.io.OutputStream;
+ import java.io.Serializable;
+ import java.net.URI;
 -import java.net.URL;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.LinkedHashMap;
+ import java.util.LinkedHashSet;
+ import java.util.List;
+ import java.util.ListIterator;
+ import java.util.Map;
+ import java.util.Queue;
+ import java.util.Set;
+ import java.util.Timer;
+ import java.util.TimerTask;
+ import java.util.TreeMap;
+ import java.util.UUID;
+ import java.util.concurrent.CompletionService;
++import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentLinkedQueue;
++import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.ExecutorCompletionService;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
+ import java.util.regex.Pattern;
+ 
+ import javax.net.ssl.SSLContext;
+ import javax.ws.rs.HttpMethod;
+ import javax.ws.rs.WebApplicationException;
+ import javax.ws.rs.core.StreamingOutput;
 -import javax.xml.XMLConstants;
+ import javax.xml.parsers.DocumentBuilder;
+ import javax.xml.parsers.DocumentBuilderFactory;
+ import javax.xml.parsers.ParserConfigurationException;
++import javax.xml.transform.OutputKeys;
++import javax.xml.transform.Transformer;
++import javax.xml.transform.TransformerException;
++import javax.xml.transform.TransformerFactory;
+ import javax.xml.transform.dom.DOMSource;
 -import javax.xml.validation.Schema;
 -import javax.xml.validation.SchemaFactory;
 -import javax.xml.validation.Validator;
++import javax.xml.transform.stream.StreamResult;
+ 
+ import org.apache.commons.lang3.StringUtils;
+ import org.apache.nifi.admin.service.AuditService;
++import org.apache.nifi.annotation.lifecycle.OnAdded;
++import org.apache.nifi.annotation.lifecycle.OnRemoved;
+ import org.apache.nifi.cluster.BulletinsPayload;
+ import org.apache.nifi.cluster.HeartbeatPayload;
+ import org.apache.nifi.cluster.context.ClusterContext;
+ import org.apache.nifi.cluster.context.ClusterContextImpl;
+ import org.apache.nifi.cluster.event.Event;
+ import org.apache.nifi.cluster.event.EventManager;
+ import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
+ import org.apache.nifi.cluster.flow.ClusterDataFlow;
+ import org.apache.nifi.cluster.flow.DaoException;
+ import org.apache.nifi.cluster.flow.DataFlowManagementService;
+ import org.apache.nifi.cluster.flow.PersistedFlowState;
+ import org.apache.nifi.cluster.manager.HttpClusterManager;
+ import org.apache.nifi.cluster.manager.HttpRequestReplicator;
+ import org.apache.nifi.cluster.manager.HttpResponseMapper;
+ import org.apache.nifi.cluster.manager.NodeResponse;
+ import 
org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
+ import 
org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
+ import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
+ import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
+ import 
org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
+ import 
org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException;
+ import 
org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException;
+ import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
+ import org.apache.nifi.cluster.manager.exception.NoResponseFromNodesException;
+ import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException;
+ import org.apache.nifi.cluster.manager.exception.NodeReconnectionException;
+ import 
org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException;
+ import 
org.apache.nifi.cluster.manager.exception.SafeModeMutableRequestException;
+ import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
+ import org.apache.nifi.cluster.manager.exception.UriConstructionException;
+ import org.apache.nifi.cluster.node.Node;
+ import org.apache.nifi.cluster.node.Node.Status;
+ import org.apache.nifi.cluster.protocol.ConnectionRequest;
+ import org.apache.nifi.cluster.protocol.ConnectionResponse;
+ import org.apache.nifi.cluster.protocol.Heartbeat;
+ import org.apache.nifi.cluster.protocol.NodeBulletins;
+ import org.apache.nifi.cluster.protocol.NodeIdentifier;
+ import org.apache.nifi.cluster.protocol.ProtocolException;
+ import org.apache.nifi.cluster.protocol.ProtocolHandler;
+ import org.apache.nifi.cluster.protocol.StandardDataFlow;
+ import 
org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener;
+ import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
+ import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
+ import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+ import 
org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
+ import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
+ import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+ import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
+ import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
+ import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+ import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
+ import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
+ import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.controller.ControllerService;
+ import org.apache.nifi.controller.Heartbeater;
+ import org.apache.nifi.controller.ReportingTaskNode;
++import org.apache.nifi.controller.StandardFlowSerializer;
+ import org.apache.nifi.controller.ValidationContextFactory;
++import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
+ import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
+ import 
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
++import org.apache.nifi.controller.reporting.ReportingTaskProvider;
+ import 
org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
++import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
+ import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
+ import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
++import org.apache.nifi.controller.service.ControllerServiceLoader;
+ import org.apache.nifi.controller.service.ControllerServiceNode;
+ import org.apache.nifi.controller.service.ControllerServiceProvider;
+ import org.apache.nifi.controller.service.StandardControllerServiceProvider;
+ import org.apache.nifi.controller.status.ProcessGroupStatus;
+ import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+ import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+ import org.apache.nifi.controller.status.history.MetricDescriptor;
+ import org.apache.nifi.controller.status.history.StatusHistory;
+ import org.apache.nifi.controller.status.history.StatusHistoryUtil;
+ import org.apache.nifi.controller.status.history.StatusSnapshot;
+ import org.apache.nifi.diagnostics.GarbageCollection;
+ import org.apache.nifi.diagnostics.StorageUsage;
+ import org.apache.nifi.diagnostics.SystemDiagnostics;
+ import org.apache.nifi.encrypt.StringEncryptor;
+ import org.apache.nifi.engine.FlowEngine;
+ import org.apache.nifi.events.BulletinFactory;
+ import org.apache.nifi.events.VolatileBulletinRepository;
+ import org.apache.nifi.framework.security.util.SslContextFactory;
+ import org.apache.nifi.io.socket.multicast.DiscoverableService;
++import org.apache.nifi.logging.ComponentLog;
+ import org.apache.nifi.logging.NiFiLog;
+ import org.apache.nifi.nar.ExtensionManager;
+ import org.apache.nifi.nar.NarCloseable;
+ import org.apache.nifi.nar.NarThreadContextClassLoader;
++import org.apache.nifi.processor.SimpleProcessLogger;
+ import org.apache.nifi.processor.StandardValidationContextFactory;
+ import org.apache.nifi.remote.RemoteResourceManager;
+ import org.apache.nifi.remote.RemoteSiteListener;
+ import org.apache.nifi.remote.SocketRemoteSiteListener;
+ import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+ import org.apache.nifi.remote.cluster.NodeInformation;
+ import org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol;
+ import org.apache.nifi.reporting.Bulletin;
+ import org.apache.nifi.reporting.BulletinRepository;
+ import org.apache.nifi.reporting.InitializationException;
+ import org.apache.nifi.reporting.ReportingInitializationContext;
+ import org.apache.nifi.reporting.ReportingTask;
+ import org.apache.nifi.reporting.Severity;
+ import org.apache.nifi.scheduling.SchedulingStrategy;
+ import org.apache.nifi.util.DomUtils;
+ import org.apache.nifi.util.FormatUtils;
+ import org.apache.nifi.util.NiFiProperties;
++import org.apache.nifi.util.ObjectHolder;
++import org.apache.nifi.util.ReflectionUtils;
++import org.apache.nifi.web.OptimisticLockingManager;
+ import org.apache.nifi.web.Revision;
++import org.apache.nifi.web.UpdateRevision;
+ import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+ import org.apache.nifi.web.api.dto.NodeDTO;
+ import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+ import org.apache.nifi.web.api.dto.ProcessorDTO;
+ import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
+ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
+ import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+ import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
+ import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
+ import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
+ import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO;
+ import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
+ import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO;
+ import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+ import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+ import org.apache.nifi.web.api.entity.FlowSnippetEntity;
+ import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+ import org.apache.nifi.web.api.entity.ProcessorEntity;
+ import org.apache.nifi.web.api.entity.ProcessorsEntity;
+ import org.apache.nifi.web.api.entity.ProvenanceEntity;
+ import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
+ import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
+ import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
+ import org.apache.nifi.web.util.WebUtils;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.w3c.dom.DOMException;
+ import org.w3c.dom.Document;
+ import org.w3c.dom.Element;
+ import org.w3c.dom.NodeList;
+ import org.xml.sax.SAXException;
+ import org.xml.sax.SAXParseException;
+ 
+ import com.sun.jersey.api.client.ClientResponse;
++import org.apache.nifi.controller.service.ControllerServiceState;
++import org.apache.nifi.web.api.dto.ControllerServiceDTO;
++import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
++import org.apache.nifi.web.api.dto.ReportingTaskDTO;
++import org.apache.nifi.web.api.entity.ControllerServiceEntity;
++import 
org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
++import org.apache.nifi.web.api.entity.ControllerServicesEntity;
++import org.apache.nifi.web.api.entity.ReportingTaskEntity;
++import org.apache.nifi.web.api.entity.ReportingTasksEntity;
+ 
+ /**
+  * Provides a cluster manager implementation. The manager federates incoming
+  * HTTP client requests to the nodes' external API using the HTTP protocol. 
The
+  * manager also communicates with nodes using the nodes' internal socket
+  * protocol.
+  *
+  * The manager's socket address may broadcasted using multicast if a
+  * MulticastServiceBroadcaster instance is set on this instance. The manager
+  * instance must be started after setting the broadcaster.
+  *
+  * The manager may be configured with an EventManager for recording noteworthy
+  * lifecycle events (e.g., first heartbeat received, node status change).
+  *
+  * The start() and stop() methods must be called to initialize and stop the
+  * instance.
+  *
+  * @author unattributed
+  */
 -public class WebClusterManager implements HttpClusterManager, 
ProtocolHandler, ControllerServiceProvider {
++public class WebClusterManager implements HttpClusterManager, 
ProtocolHandler, ControllerServiceProvider, ReportingTaskProvider {
+ 
+     public static final String ROOT_GROUP_ID_ALIAS = "root";
+     public static final String BULLETIN_CATEGORY = "Clustering";
+ 
+     private static final Logger logger = new 
NiFiLog(LoggerFactory.getLogger(WebClusterManager.class));
+     private static final Logger heartbeatLogger = new 
NiFiLog(LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat"));
+ 
+     /**
+      * The HTTP header to store a cluster context. An example of what may be
+      * stored in the context is a node's auditable actions in response to a
+      * cluster request. The cluster context is serialized using Java's
+      * serialization mechanism and hex encoded.
+      */
+     public static final String CLUSTER_CONTEXT_HTTP_HEADER = 
"X-ClusterContext";
+ 
+     /**
+      * HTTP Header that stores a unique ID for each request that is replicated
+      * to the nodes. This is used for logging purposes so that request
+      * information, such as timing, can be correlated between the NCM and the
+      * nodes
+      */
+     public static final String REQUEST_ID_HEADER = "X-RequestID";
+ 
+     /**
+      * The HTTP header that the NCM specifies to ask a node if they are able 
to
+      * process a given request. The value is always 150-NodeContinue. The node
+      * will respond with 150 CONTINUE if it is able to process the request, 
417
+      * EXPECTATION_FAILED otherwise.
+      */
+     public static final String NCM_EXPECTS_HTTP_HEADER = "X-NcmExpects";
+     public static final int NODE_CONTINUE_STATUS_CODE = 150;
+ 
+     /**
+      * The HTTP header that the NCM specifies to indicate that a node should
+      * invalidate the specified user group. This is done to ensure that user
+      * cache is not stale when an administrator modifies a group through the 
UI.
+      */
+     public static final String CLUSTER_INVALIDATE_USER_GROUP_HEADER = 
"X-ClusterInvalidateUserGroup";
+ 
+     /**
+      * The HTTP header that the NCM specifies to indicate that a node should
+      * invalidate the specified user. This is done to ensure that user cache 
is
+      * not stale when an administrator modifies a user through the UI.
+      */
+     public static final String CLUSTER_INVALIDATE_USER_HEADER = 
"X-ClusterInvalidateUser";
+ 
+     /**
+      * The default number of seconds to respond to a connecting node if the
+      * manager cannot provide it with a current data flow.
+      */
+     private static final int DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS = 5;
+ 
+     public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = 
"org.apache.nifi.controller.status.history.VolatileComponentStatusRepository";
+ 
+     public static final Pattern PROCESSORS_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors");
+     public static final Pattern PROCESSOR_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}");
+ 
+     public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups");
+     public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}");
+ 
+     public static final Pattern PROCESS_GROUP_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))");
+     public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance");
+     public static final Pattern FLOW_SNIPPET_INSTANCE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/snippet-instance");
+ 
+     public static final String PROVENANCE_URI = 
"/nifi-api/controller/provenance";
+     public static final Pattern PROVENANCE_QUERY_URI = 
Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}");
+     public static final Pattern PROVENANCE_EVENT_URI = 
Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+");
 -
++    
++    public static final String CONTROLLER_SERVICES_URI = 
"/nifi-api/controller/controller-services/node";
++    public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
++    public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/references");
++    public static final String REPORTING_TASKS_URI = 
"/nifi-api/controller/reporting-tasks/node";
++    public static final Pattern REPORTING_TASK_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
++    
+     private final NiFiProperties properties;
+     private final HttpRequestReplicator httpRequestReplicator;
+     private final HttpResponseMapper httpResponseMapper;
+     private final DataFlowManagementService dataFlowManagementService;
+     private final ClusterManagerProtocolSenderListener senderListener;
++    private final OptimisticLockingManager optimisticLockingManager;
+     private final StringEncryptor encryptor;
+     private final Queue<Heartbeat> pendingHeartbeats = new 
ConcurrentLinkedQueue<>();
+     private final ReentrantReadWriteLock resourceRWLock = new 
ReentrantReadWriteLock();
+     private final ClusterManagerLock readLock = new 
ClusterManagerLock(resourceRWLock.readLock(), "Read");
+     private final ClusterManagerLock writeLock = new 
ClusterManagerLock(resourceRWLock.writeLock(), "Write");
+ 
+     private final Set<Node> nodes = new HashSet<>();
 -    private final Set<ReportingTaskNode> reportingTasks = new HashSet<>();
++    private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = 
new ConcurrentHashMap<>();
+ 
+     // null means the dataflow should be read from disk
+     private StandardDataFlow cachedDataFlow = null;
+     private NodeIdentifier primaryNodeId = null;
 -    private Revision revision = new Revision(0L, "");
+     private Timer heartbeatMonitor;
+     private Timer heartbeatProcessor;
+     private volatile ClusterServicesBroadcaster servicesBroadcaster = null;
+     private volatile EventManager eventManager = null;
+     private volatile ClusterNodeFirewall clusterFirewall = null;
+     private volatile AuditService auditService = null;
+     private volatile ControllerServiceProvider controllerServiceProvider = 
null;
+ 
+     private final RemoteSiteListener remoteSiteListener;
+     private final Integer remoteInputPort;
+     private final Boolean remoteCommsSecure;
+     private final BulletinRepository bulletinRepository;
+     private final String instanceId;
+     private final FlowEngine reportingTaskEngine;
+     private final Map<NodeIdentifier, ComponentStatusRepository> 
componentMetricsRepositoryMap = new HashMap<>();
+     private final StandardProcessScheduler processScheduler;
+     private final long componentStatusSnapshotMillis;
+ 
+     public WebClusterManager(final HttpRequestReplicator 
httpRequestReplicator, final HttpResponseMapper httpResponseMapper,
+             final DataFlowManagementService dataFlowManagementService, final 
ClusterManagerProtocolSenderListener senderListener,
 -            final NiFiProperties properties, final StringEncryptor encryptor) 
{
++            final NiFiProperties properties, final StringEncryptor encryptor, 
final OptimisticLockingManager optimisticLockingManager) {
+ 
+         if (httpRequestReplicator == null) {
+             throw new IllegalArgumentException("HttpRequestReplicator may not 
be null.");
+         } else if (httpResponseMapper == null) {
+             throw new IllegalArgumentException("HttpResponseMapper may not be 
null.");
+         } else if (dataFlowManagementService == null) {
+             throw new IllegalArgumentException("DataFlowManagementService may 
not be null.");
+         } else if (senderListener == null) {
+             throw new 
IllegalArgumentException("ClusterManagerProtocolSenderListener may not be 
null.");
+         } else if (properties == null) {
+             throw new IllegalArgumentException("NiFiProperties may not be 
null.");
+         }
+ 
+         // Ensure that our encryptor/decryptor is properly initialized
+         this.httpRequestReplicator = httpRequestReplicator;
+         this.httpResponseMapper = httpResponseMapper;
+         this.dataFlowManagementService = dataFlowManagementService;
+         this.properties = properties;
 -        this.controllerServiceProvider = new 
StandardControllerServiceProvider();
+         this.bulletinRepository = new VolatileBulletinRepository();
+         this.instanceId = UUID.randomUUID().toString();
+         this.senderListener = senderListener;
+         this.encryptor = encryptor;
++        this.optimisticLockingManager = optimisticLockingManager;
+         senderListener.addHandler(this);
+         senderListener.setBulletinRepository(bulletinRepository);
+ 
+         final String snapshotFrequency = 
properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, 
NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY);
+         long snapshotMillis;
+         try {
+             snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, 
TimeUnit.MILLISECONDS);
+         } catch (final Exception e) {
+             snapshotMillis = 
FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY,
 TimeUnit.MILLISECONDS);
+         }
+         componentStatusSnapshotMillis = snapshotMillis;
+ 
+         remoteInputPort = properties.getRemoteInputPort();
+         if (remoteInputPort == null) {
+             remoteSiteListener = null;
+             remoteCommsSecure = null;
+         } else {
+             // Register the ClusterManagerServerProtocol as the appropriate 
resource for site-to-site Server Protocol
+             
RemoteResourceManager.setServerProtocolImplementation(ClusterManagerServerProtocol.RESOURCE_NAME,
 ClusterManagerServerProtocol.class);
+             remoteCommsSecure = properties.isSiteToSiteSecure();
+             if (remoteCommsSecure) {
+                 final SSLContext sslContext = 
SslContextFactory.createSslContext(properties, false);
+ 
+                 if (sslContext == null) {
+                     throw new IllegalStateException("NiFi Configured to allow 
Secure Site-to-Site communications but the Keystore/Truststore properties are 
not configured");
+                 }
+ 
+                 remoteSiteListener = new 
SocketRemoteSiteListener(remoteInputPort.intValue(), sslContext, this);
+             } else {
+                 remoteSiteListener = new 
SocketRemoteSiteListener(remoteInputPort.intValue(), null, this);
+             }
+         }
+ 
+         reportingTaskEngine = new FlowEngine(8, "Reporting Task Thread");
+ 
+         processScheduler = new StandardProcessScheduler(new Heartbeater() {
+             @Override
+             public void heartbeat() {
+             }
+         }, this, encryptor);
++        
++        // When we construct the scheduling agents, we can pass null for a 
lot of the arguments because we are only
++        // going to be scheduling Reporting Tasks. Otherwise, it would not be 
okay.
+         processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, 
new TimerDrivenSchedulingAgent(null, reportingTaskEngine, null, encryptor));
++        processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, 
new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor));
+         processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 
10);
+         processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 
10);
++        
++        controllerServiceProvider = new 
StandardControllerServiceProvider(processScheduler);
+     }
+ 
+     public void start() throws IOException {
+         writeLock.lock();
+         try {
+ 
+             if (isRunning()) {
+                 throw new IllegalStateException("Instance is already 
started.");
+             }
+ 
+             try {
+                 // setup heartbeat monitoring
+                 heartbeatMonitor = new Timer("Heartbeat Monitor", /* is 
daemon */ true);
+                 heartbeatMonitor.scheduleAtFixedRate(new 
HeartbeatMonitoringTimerTask(), 0, getHeartbeatMonitoringIntervalSeconds() * 
1000);
+ 
+                 heartbeatProcessor = new Timer("Process Pending Heartbeats", 
true);
+                 final int processPendingHeartbeatDelay = 1000 * Math.max(1, 
getClusterProtocolHeartbeatSeconds() / 2);
+                 heartbeatProcessor.schedule(new 
ProcessPendingHeartbeatsTask(), processPendingHeartbeatDelay, 
processPendingHeartbeatDelay);
+ 
+                 // start request replication service
+                 httpRequestReplicator.start();
+ 
+                 // start protocol service
+                 senderListener.start();
+ 
+                 // start flow management service
+                 dataFlowManagementService.start();
+ 
+                 if (remoteSiteListener != null) {
+                     remoteSiteListener.start();
+                 }
+ 
+                 // load flow
++                final ClusterDataFlow clusterDataFlow;
+                 if (dataFlowManagementService.isFlowCurrent()) {
 -                    final ClusterDataFlow clusterDataFlow = 
dataFlowManagementService.loadDataFlow();
++                    clusterDataFlow = 
dataFlowManagementService.loadDataFlow();
+                     cachedDataFlow = clusterDataFlow.getDataFlow();
+                     primaryNodeId = clusterDataFlow.getPrimaryNodeId();
+                 } else {
+                     throw new IOException("Flow is not current.");
+                 }
+ 
++                final byte[] serializedServices = 
clusterDataFlow.getControllerServices();
++                if ( serializedServices != null && serializedServices.length 
> 0 ) {
++                      ControllerServiceLoader.loadControllerServices(this, 
new ByteArrayInputStream(serializedServices), encryptor, bulletinRepository, 
properties.getAutoResumeState());
++                }
++                
+                 // start multicast broadcasting service, if configured
+                 if (servicesBroadcaster != null) {
+                     servicesBroadcaster.start();
+                 }
+ 
+                 // start in safe mode
+                 executeSafeModeTask();
+ 
+                 // Load and start running Reporting Tasks
 -                final File taskFile = new 
File(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE));
 -                reportingTasks.addAll(loadReportingTasks(taskFile));
++                final byte[] serializedReportingTasks = 
clusterDataFlow.getReportingTasks();
++                if ( serializedReportingTasks != null && 
serializedReportingTasks.length > 0 ) {
++                      loadReportingTasks(serializedReportingTasks);
++                }
+             } catch (final IOException ioe) {
+                 logger.warn("Failed to initialize cluster services due to: " 
+ ioe, ioe);
+                 stop();
+                 throw ioe;
+             }
+ 
+         } finally {
+             writeLock.unlock("START");
+         }
+     }
+ 
+     public void stop() throws IOException {
+         writeLock.lock();
+         try {
+ 
+             // returns true if any service is running
+             if (isRunning() == false) {
+                 throw new IllegalArgumentException("Instance is already 
stopped.");
+             }
+ 
+             boolean encounteredException = false;
+ 
+             // stop the heartbeat monitoring
+             if (isHeartbeatMonitorRunning()) {
+                 heartbeatMonitor.cancel();
+                 heartbeatMonitor = null;
+             }
+ 
+             if (heartbeatProcessor != null) {
+                 heartbeatProcessor.cancel();
+                 heartbeatProcessor = null;
+             }
+ 
+             // stop the HTTP request replicator service
+             if (httpRequestReplicator.isRunning()) {
+                 httpRequestReplicator.stop();
+             }
+ 
+             // stop the flow management service
+             if (dataFlowManagementService.isRunning()) {
+                 dataFlowManagementService.stop();
+             }
+ 
+             if (remoteSiteListener != null) {
+                 remoteSiteListener.stop();
+             }
+ 
+             // stop the protocol listener service
+             if (senderListener.isRunning()) {
+                 try {
+                     senderListener.stop();
+                 } catch (final IOException ioe) {
+                     encounteredException = true;
+                     logger.warn("Failed to shutdown protocol service due to: 
" + ioe, ioe);
+                 }
+             }
+ 
+             // stop the service broadcaster
+             if (isBroadcasting()) {
+                 servicesBroadcaster.stop();
+             }
+ 
+             if ( processScheduler != null ) {
+                 processScheduler.shutdown();
+             }
+             
+             if (encounteredException) {
+                 throw new IOException("Failed to shutdown Cluster Manager 
because one or more cluster services failed to shutdown.  Check the logs for 
details.");
+             }
+ 
+         } finally {
+             writeLock.unlock("STOP");
+         }
+     }
+ 
+     public boolean isRunning() {
+         readLock.lock();
+         try {
+             return isHeartbeatMonitorRunning()
+                     || httpRequestReplicator.isRunning()
+                     || senderListener.isRunning()
+                     || dataFlowManagementService.isRunning()
+                     || isBroadcasting();
+         } finally {
+             readLock.unlock("isRunning");
+         }
+     }
+ 
+     @Override
+     public boolean canHandle(ProtocolMessage msg) {
+         return MessageType.CONNECTION_REQUEST == msg.getType()
+                 || MessageType.HEARTBEAT == msg.getType()
+                 || MessageType.CONTROLLER_STARTUP_FAILURE == msg.getType()
+                 || MessageType.BULLETINS == msg.getType()
+                 || MessageType.RECONNECTION_FAILURE == msg.getType();
+     }
+ 
+     @Override
+     public ProtocolMessage handle(final ProtocolMessage protocolMessage) 
throws ProtocolException {
+         switch (protocolMessage.getType()) {
+             case CONNECTION_REQUEST:
+                 return handleConnectionRequest((ConnectionRequestMessage) 
protocolMessage);
+             case HEARTBEAT:
+                 final HeartbeatMessage heartbeatMessage = (HeartbeatMessage) 
protocolMessage;
+ 
+                 final Heartbeat original = heartbeatMessage.getHeartbeat();
+                 final NodeIdentifier originalNodeId = 
original.getNodeIdentifier();
+                 final Heartbeat heartbeatWithDn = new 
Heartbeat(addRequestorDn(originalNodeId, heartbeatMessage.getRequestorDN()), 
original.isPrimary(), original.isConnected(), original.getPayload());
+ 
+                 handleHeartbeat(heartbeatWithDn);
+                 return null;
+             case CONTROLLER_STARTUP_FAILURE:
+                 new Thread(new Runnable() {
+                     @Override
+                     public void run() {
+                         
handleControllerStartupFailure((ControllerStartupFailureMessage) 
protocolMessage);
+                     }
+                 }, "Handle Controller Startup Failure Message from " + 
((ControllerStartupFailureMessage) protocolMessage).getNodeId()).start();
+                 return null;
+             case RECONNECTION_FAILURE:
+                 new Thread(new Runnable() {
+                     @Override
+                     public void run() {
+                         
handleReconnectionFailure((ReconnectionFailureMessage) protocolMessage);
+                     }
+                 }, "Handle Reconnection Failure Message from " + 
((ReconnectionFailureMessage) protocolMessage).getNodeId()).start();
+                 return null;
+             case BULLETINS:
+                 final NodeBulletinsMessage bulletinsMessage = 
(NodeBulletinsMessage) protocolMessage;
+                 handleBulletins(bulletinsMessage.getBulletins());
+                 return null;
+             default:
+                 throw new ProtocolException("No handler defined for message 
type: " + protocolMessage.getType());
+         }
+     }
+ 
+     /**
+      * Services connection requests. If the data flow management service is
+      * unable to provide a current copy of the data flow, then the returned
+      * connection response will indicate the node should try later. Otherwise,
+      * the connection response will contain the the flow and the node
+      * identifier.
+      *
+      * If this instance is configured with a firewall and the request is
+      * blocked, then the response will not contain a node identifier.
+      *
+      * @param request a connection request
+      *
+      * @return a connection response
+      */
+     @Override
+     public ConnectionResponse requestConnection(final ConnectionRequest 
request) {
+         final boolean lockObtained = writeLock.tryLock(3, TimeUnit.SECONDS);
+         if (!lockObtained) {
+             // Create try-later response because we are too busy to service 
the request right now. We do not want
+             // to wait long because we want Node/NCM comms to be very 
responsive
+             final int tryAgainSeconds;
+             if (dataFlowManagementService.getRetrievalDelaySeconds() <= 0) {
+                 tryAgainSeconds = 
DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS;
+             } else {
+                 tryAgainSeconds = 
dataFlowManagementService.getRetrievalDelaySeconds();
+             }
+ 
+             // record event
+             final String msg = "Connection requested from node, but manager 
was too busy to service request.  Instructing node to try again in " + 
tryAgainSeconds + " seconds.";
+             addEvent(request.getProposedNodeIdentifier(), msg);
+             addBulletin(request.getProposedNodeIdentifier(), Severity.INFO, 
msg);
+ 
+             // return try later response
+             return new ConnectionResponse(tryAgainSeconds);
+         }
+ 
+         try {
+             // resolve the proposed node identifier to a valid node identifier
+             final NodeIdentifier resolvedNodeIdentifier = 
resolveProposedNodeIdentifier(request.getProposedNodeIdentifier());
+ 
+             if 
(isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
+                 // if the socket address is not listed in the firewall, then 
return a null response
+                 logger.info("Firewall blocked connection request from node " 
+ resolvedNodeIdentifier);
+                 return ConnectionResponse.createBlockedByFirewallResponse();
+             }
+ 
+             // get a raw reference to the node (if it doesn't exist, node 
will be null)
+             Node node = getRawNode(resolvedNodeIdentifier.getId());
+ 
+             // create a new node if necessary and set status to connecting
+             if (node == null) {
+                 node = new Node(resolvedNodeIdentifier, Status.CONNECTING);
+                 addEvent(node.getNodeId(), "Connection requested from new 
node.  Setting status to connecting.");
+                 nodes.add(node);
+             } else {
+                 node.setStatus(Status.CONNECTING);
+                 addEvent(resolvedNodeIdentifier, "Connection requested from 
existing node.  Setting status to connecting");
+             }
+ 
+             // record the time of the connection request
+             node.setConnectionRequestedTimestamp(new Date().getTime());
+ 
+             // clear out old heartbeat info
+             node.setHeartbeat(null);
+ 
+             // try to obtain a current flow
+             if (dataFlowManagementService.isFlowCurrent()) {
+                 // if a cached copy does not exist, load it from disk
+                 if (cachedDataFlow == null) {
+                     final ClusterDataFlow clusterDataFlow = 
dataFlowManagementService.loadDataFlow();
+                     cachedDataFlow = clusterDataFlow.getDataFlow();
+                     primaryNodeId = clusterDataFlow.getPrimaryNodeId();
+                 }
+ 
+                 // determine if this node should be assigned the primary role
+                 final boolean primaryRole;
+                 if (primaryNodeId == null || 
primaryNodeId.logicallyEquals(node.getNodeId())) {
+                     setPrimaryNodeId(node.getNodeId());
+                     addEvent(node.getNodeId(), "Setting primary role in 
connection response.");
+                     primaryRole = true;
+                 } else {
+                     primaryRole = false;
+                 }
+ 
+                 return new ConnectionResponse(node.getNodeId(), 
cachedDataFlow, primaryRole, remoteInputPort, remoteCommsSecure, instanceId);
+             }
+ 
+             /*
+              * The manager does not have a current copy of the data flow, 
+              * so it will instruct the node to try connecting at a later 
+              * time.  Meanwhile, the flow will be locked down from user 
+              * changes because the node is marked as connecting.
+              */
+ 
+             /*
+              * Create try-later response based on flow retrieval delay to 
give 
+              * the flow management service a chance to retrieve a curren flow
+              */
+             final int tryAgainSeconds;
+             if (dataFlowManagementService.getRetrievalDelaySeconds() <= 0) {
+                 tryAgainSeconds = 
DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS;
+             } else {
+                 tryAgainSeconds = 
dataFlowManagementService.getRetrievalDelaySeconds();
+             }
+ 
+             // record event
+             addEvent(node.getNodeId(), "Connection requested from node, but 
manager was unable to obtain current flow.  Instructing node to try again in " 
+ tryAgainSeconds + " seconds.");
+ 
+             // return try later response
+             return new ConnectionResponse(tryAgainSeconds);
+ 
+         } finally {
+             writeLock.unlock("requestConnection");
+         }
+     }
+ 
+     /**
+      * Services reconnection requests for a given node. If the node indicates
+      * reconnection failure, then the node will be set to disconnected and if
+      * the node has primary role, then the role will be revoked. Otherwise, a
+      * reconnection request will be sent to the node, initiating the 
connection
+      * handshake.
+      *
+      * @param nodeId a node identifier
+      *
+      * @throws UnknownNodeException if the node does not exist
+      * @throws IllegalNodeReconnectionException if the node cannot be
+      * reconnected because the node is not disconnected
+      * @throws NodeReconnectionException if the reconnection message failed to
+      * be sent or the cluster could not provide a current data flow for the
+      * reconnection request
+      */
+     @Override
+     public void requestReconnection(final String nodeId, final String userDn) 
throws UnknownNodeException, IllegalNodeReconnectionException {
+         Node node = null;
+ 
+         final boolean primaryRole;
+         final int tryAgainSeconds;
+ 
+         writeLock.lock();
+         try {
+             // check if we know about this node and that it is disconnected
+             node = getRawNode(nodeId);
+             logger.info("Request was made by {} to reconnect node {} to 
cluster", userDn, node == null ? nodeId : node);
+ 
+             if (node == null) {
+                 throw new UnknownNodeException("Node does not exist.");
+             } else if (Status.DISCONNECTED != node.getStatus()) {
+                 throw new IllegalNodeReconnectionException("Node must be 
disconnected before it can reconnect.");
+             }
+ 
+             // clear out old heartbeat info
+             node.setHeartbeat(null);
+ 
+             // get the dataflow to send with the reconnection request
+             if (!dataFlowManagementService.isFlowCurrent()) {
+                 /* node remains disconnected */
+                 final String msg = "Reconnection requested for node, but 
manager was unable to obtain current flow.  Setting node to disconnected.";
+                 addEvent(node.getNodeId(), msg);
+                 addBulletin(node, Severity.WARNING, msg);
+                 throw new NodeReconnectionException("Manager was unable to 
obtain current flow to provide in reconnection request to node.  Try again in a 
few seconds.");
+             }
+ 
+             // if a cached copy does not exist, load it from disk
+             if (cachedDataFlow == null) {
+                 final ClusterDataFlow clusterDataFlow = 
dataFlowManagementService.loadDataFlow();
+                 cachedDataFlow = clusterDataFlow.getDataFlow();
+                 primaryNodeId = clusterDataFlow.getPrimaryNodeId();
+             }
+ 
+             node.setStatus(Status.CONNECTING);
+             addEvent(node.getNodeId(), "Reconnection requested for node.  
Setting status to connecting.");
+ 
+             // determine if this node should be assigned the primary role
+             if (primaryNodeId == null || 
primaryNodeId.logicallyEquals(node.getNodeId())) {
+                 setPrimaryNodeId(node.getNodeId());
+                 addEvent(node.getNodeId(), "Setting primary role in 
reconnection request.");
+                 primaryRole = true;
+             } else {
+                 primaryRole = false;
+             }
+ 
+             if (dataFlowManagementService.getRetrievalDelaySeconds() <= 0) {
+                 tryAgainSeconds = 
DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS;
+             } else {
+                 tryAgainSeconds = 
dataFlowManagementService.getRetrievalDelaySeconds();
+             }
+         } catch (final UnknownNodeException | 
IllegalNodeReconnectionException | NodeReconnectionException une) {
+             throw une;
+         } catch (final Exception ex) {
+             logger.warn("Problem encountered issuing reconnection request to 
node " + node.getNodeId() + " due to: " + ex, ex);
+ 
+             node.setStatus(Status.DISCONNECTED);
+             final String eventMsg = "Problem encountered issuing reconnection 
request. Node will remain disconnected: " + ex;
+             addEvent(node.getNodeId(), eventMsg);
+             addBulletin(node, Severity.WARNING, eventMsg);
+ 
+             // Exception thrown will include node ID but event/bulletin do 
not because the node/id is passed along with the message
+             throw new NodeReconnectionException("Problem encountered issuing 
reconnection request to " + node.getNodeId() + ". Node will remain 
disconnected: " + ex, ex);
+         } finally {
+             writeLock.unlock("requestReconnection");
+         }
+ 
+         // Asynchronously start attempting reconnection. This is not 
completely thread-safe, as
+         // we do this by releasing the write lock and then obtaining a read 
lock for each attempt,
+         // so we suffer from the ABA problem. However, we are willing to 
accept the consequences of
+         // this situation in order to avoid holding a lock for the entire 
duration. "The consequences"
+         // are that a second thread could potentially be doing the same 
thing, issuing a reconnection request.
+         // However, this is very unlikely to happen, based on the conditions 
under which we issue a reconnection
+         // request. And if we do, the node will simply reconnect multiple 
times, which is not a big deal.
+         requestReconnectionAsynchronously(node, primaryRole, 10, 
tryAgainSeconds);
+     }
+ 
+     private void requestReconnectionAsynchronously(final Node node, final 
boolean primaryRole, final int reconnectionAttempts, final int retrySeconds) {
+         final Thread reconnectionThread = new Thread(new Runnable() {
+             @Override
+             public void run() {
+                 for (int i = 0; i < reconnectionAttempts; i++) {
+                     final ReconnectionRequestMessage request = new 
ReconnectionRequestMessage();
+ 
+                     try {
+                         readLock.lock();
+                         try {
+                             if (Status.CONNECTING != node.getStatus()) {
+                                 // the node status has changed. It's no 
longer appropriate to attempt reconnection.
+                                 return;
+                             }
+ 
+                             // create the request
+                             request.setNodeId(node.getNodeId());
+                             request.setDataFlow(cachedDataFlow);
+                             request.setPrimary(primaryRole);
+                             
request.setManagerRemoteSiteCommsSecure(remoteCommsSecure);
+                             
request.setManagerRemoteSiteListeningPort(remoteInputPort);
+                             request.setInstanceId(instanceId);
+                         } finally {
+                             readLock.unlock("Reconnect " + node.getNodeId());
+                         }
+ 
+                         // Issue a reconnection request to the node.
+                         senderListener.requestReconnection(request);
+ 
+                         
node.setConnectionRequestedTimestamp(System.currentTimeMillis());
+ 
+                         // successfully told node to reconnect -- we're done!
+                         return;
+                     } catch (final Exception e) {
+                         logger.warn("Problem encountered issuing reconnection 
request to node " + node.getNodeId() + " due to: " + e);
+                         if (logger.isDebugEnabled()) {
+                             logger.warn("", e);
+                         }
+ 
+                         addBulletin(node, Severity.WARNING, "Problem 
encountered issuing reconnection request to node " + node.getNodeId() + " due 
to: " + e);
+                     }
+ 
+                     try {
+                         Thread.sleep(1000L * retrySeconds);
+                     } catch (final InterruptedException ie) {
+                         break;
+                     }
+                 }
+ 
+                 // We failed to reconnect 10 times. We must now mark node as 
disconnected.
+                 writeLock.lock();
+                 try {
+                     if (Status.CONNECTING == node.getStatus()) {
+                         requestDisconnectionQuietly(node.getNodeId(), "Failed 
to issue Reconnection Request " + reconnectionAttempts + " times");
+                     }
+                 } finally {
+                     writeLock.unlock("Mark node as Disconnected as a result 
of reconnection failure");
+                 }
+             }
+         }, "Reconnect " + node.getNodeId());
+ 
+         reconnectionThread.start();
+     }
+ 
 -    private List<ReportingTaskNode> loadReportingTasks(final File 
taskConfigXml) {
 -        final List<ReportingTaskNode> tasks = new ArrayList<>();
 -        if (taskConfigXml == null) {
 -            logger.info("No controller tasks to start");
 -            return tasks;
 -        }
++    private Map<String, ReportingTaskNode> loadReportingTasks(final byte[] 
serialized) {
++        final Map<String, ReportingTaskNode> tasks = new HashMap<>();
+ 
+         try {
 -            final URL schemaUrl = 
getClass().getResource("/ReportingTaskConfiguration.xsd");
 -            final Document document = parse(taskConfigXml, schemaUrl);
++            final Document document = parse(serialized);
+ 
 -            final NodeList tasksNodes = 
document.getElementsByTagName("tasks");
++            final NodeList tasksNodes = 
document.getElementsByTagName("reportingTasks");
+             final Element tasksElement = (Element) tasksNodes.item(0);
+ 
+             //optional properties for all ReportingTasks
 -            for (final Element taskElement : 
DomUtils.getChildElementsByTagName(tasksElement, "task")) {
++            for (final Element taskElement : 
DomUtils.getChildElementsByTagName(tasksElement, "reportingTask")) {
+                 //add global properties common to all tasks
+                 Map<String, String> properties = new HashMap<>();
+ 
+                 //get properties for the specific reporting task - id, name, 
class,
+                 //and schedulingPeriod must be set
+                 final String taskId = DomUtils.getChild(taskElement, 
"id").getTextContent().trim();
+                 final String taskName = DomUtils.getChild(taskElement, 
"name").getTextContent().trim();
+ 
+                 final List<Element> schedulingStrategyNodeList = 
DomUtils.getChildElementsByTagName(taskElement, "schedulingStrategy");
+                 String schedulingStrategyValue = 
SchedulingStrategy.TIMER_DRIVEN.name();
+                 if (schedulingStrategyNodeList.size() == 1) {
+                     final String specifiedValue = 
schedulingStrategyNodeList.get(0).getTextContent();
+ 
+                     try {
+                         schedulingStrategyValue = 
SchedulingStrategy.valueOf(specifiedValue).name();
+                     } catch (final Exception e) {
+                         throw new RuntimeException("Cannot start Reporting 
Task with id " + taskId + " because its Scheduling Strategy does not have a 
valid value", e);
+                     }
+                 }
+ 
+                 final SchedulingStrategy schedulingStrategy = 
SchedulingStrategy.valueOf(schedulingStrategyValue);
+                 final String taskSchedulingPeriod = 
DomUtils.getChild(taskElement, "schedulingPeriod").getTextContent().trim();
+                 final String taskClass = DomUtils.getChild(taskElement, 
"class").getTextContent().trim();
+ 
+                 //optional task-specific properties
+                 for (final Element optionalProperty : 
DomUtils.getChildElementsByTagName(taskElement, "property")) {
+                     final String name = optionalProperty.getAttribute("name");
+                     final String value = 
optionalProperty.getTextContent().trim();
+                     properties.put(name, value);
+                 }
+ 
+                 //set the class to be used for the configured reporting task
+                 final ReportingTaskNode reportingTaskNode;
+                 try {
 -                    reportingTaskNode = createReportingTask(taskClass, 
taskId);
++                    reportingTaskNode = createReportingTask(taskClass, 
taskId, false);
+                 } catch (final ReportingTaskInstantiationException e) {
+                     logger.error("Unable to load reporting task {} due to 
{}", new Object[]{taskId, e});
+                     if (logger.isDebugEnabled()) {
+                         logger.error("", e);
+                     }
+                     continue;
+                 }
+ 
+                 final ReportingTask reportingTask = 
reportingTaskNode.getReportingTask();
+ 
 -                final ReportingInitializationContext config = new 
StandardReportingInitializationContext(taskId, taskName, schedulingStrategy, 
taskSchedulingPeriod, this);
++                final ComponentLog componentLog = new 
SimpleProcessLogger(taskId, reportingTask);
++                final ReportingInitializationContext config = new 
StandardReportingInitializationContext(taskId, taskName, 
++                        schedulingStrategy, taskSchedulingPeriod, 
componentLog, this);
+                 reportingTask.initialize(config);
+ 
+                 final Map<PropertyDescriptor, String> resolvedProps;
+                 try (final NarCloseable narCloseable = 
NarCloseable.withNarLoader()) {
+                     resolvedProps = new HashMap<>();
+                     for (final Map.Entry<String, String> entry : 
properties.entrySet()) {
+                         final PropertyDescriptor descriptor = 
reportingTask.getPropertyDescriptor(entry.getKey());
+                         resolvedProps.put(descriptor, entry.getValue());
+                     }
+                 }
+ 
+                 for (final Map.Entry<PropertyDescriptor, String> entry : 
resolvedProps.entrySet()) {
+                     reportingTaskNode.setProperty(entry.getKey().getName(), 
entry.getValue());
+                 }
+ 
+                 processScheduler.schedule(reportingTaskNode);
 -                tasks.add(reportingTaskNode);
++                tasks.put(reportingTaskNode.getIdentifier(), 
reportingTaskNode);
+             }
+         } catch (final SAXException | ParserConfigurationException | 
IOException | DOMException | NumberFormatException | InitializationException t) 
{
 -            logger.error("Unable to load reporting tasks from {} due to {}", 
new Object[]{taskConfigXml, t});
++            logger.error("Unable to load reporting tasks due to {}", new 
Object[]{t});
+             if (logger.isDebugEnabled()) {
+                 logger.error("", t);
+             }
+         }
+ 
+         return tasks;
+     }
+ 
 -    private ReportingTaskNode createReportingTask(final String type, final 
String id) throws ReportingTaskInstantiationException {
++    
++    @Override
++    public ReportingTaskNode createReportingTask(final String type, final 
String id, final boolean firstTimeAdded) throws 
ReportingTaskInstantiationException {
+         if (type == null) {
+             throw new NullPointerException();
+         }
+         ReportingTask task = null;
+         final ClassLoader ctxClassLoader = 
Thread.currentThread().getContextClassLoader();
+         try {
+             final ClassLoader detectedClassLoader = 
ExtensionManager.getClassLoader(type);
+             final Class<?> rawClass;
+             if (detectedClassLoader == null) {
+                 rawClass = Class.forName(type);
+             } else {
+                 rawClass = Class.forName(type, false, detectedClassLoader);
+             }
+ 
+             Thread.currentThread().setContextClassLoader(detectedClassLoader);
+             final Class<? extends ReportingTask> reportingTaskClass = 
rawClass.asSubclass(ReportingTask.class);
+             final Object reportingTaskObj = reportingTaskClass.newInstance();
+             task = reportingTaskClass.cast(reportingTaskObj);
+         } catch (final ClassNotFoundException | SecurityException | 
InstantiationException | IllegalAccessException | IllegalArgumentException t) {
+             throw new ReportingTaskInstantiationException(type, t);
+         } finally {
+             if (ctxClassLoader != null) {
+                 Thread.currentThread().setContextClassLoader(ctxClassLoader);
+             }
+         }
+ 
+         final ValidationContextFactory validationContextFactory = new 
StandardValidationContextFactory(this);
+         final ReportingTaskNode taskNode = new 
ClusteredReportingTaskNode(task, id, processScheduler,
+                 new ClusteredEventAccess(this), bulletinRepository, 
controllerServiceProvider, validationContextFactory);
++        taskNode.setName(task.getClass().getSimpleName());
++        
++        reportingTasks.put(id, taskNode);
++        if ( firstTimeAdded ) {
++            try (final NarCloseable x = NarCloseable.withNarLoader()) {
++                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, 
task);
++            } catch (final Exception e) {
++                throw new ProcessorLifeCycleException("Failed to invoke 
On-Added Lifecycle methods of " + task, e);
++            }
++        }
++        
+         return taskNode;
+     }
+ 
 -    private Document parse(final File xmlFile, final URL schemaUrl) throws 
SAXException, ParserConfigurationException, IOException {
 -        final SchemaFactory schemaFactory = 
SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
 -        final Schema schema = schemaFactory.newSchema(schemaUrl);
++    private Document parse(final byte[] serialized) throws SAXException, 
ParserConfigurationException, IOException {
+         final DocumentBuilderFactory docFactory = 
DocumentBuilderFactory.newInstance();
 -        docFactory.setSchema(schema);
+         final DocumentBuilder builder = docFactory.newDocumentBuilder();
+ 
+         builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
+             @Override
+             public void fatalError(final SAXParseException err) throws 
SAXException {
+                 logger.error("Config file line " + err.getLineNumber() + ", 
col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + 
err.getMessage());
+                 if (logger.isDebugEnabled()) {
+                     logger.error("Error Stack Dump", err);
+                 }
+                 throw err;
+             }
+ 
+             @Override
+             public void error(final SAXParseException err) throws 
SAXParseException {
+                 logger.error("Config file line " + err.getLineNumber() + ", 
col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + 
err.getMessage());
+                 if (logger.isDebugEnabled()) {
+                     logger.error("Error Stack Dump", err);
+                 }
+                 throw err;
+             }
+ 
+             @Override
+             public void warning(final SAXParseException err) throws 
SAXParseException {
+                 logger.warn(" Config file line " + err.getLineNumber() + ", 
uri " + err.getSystemId() + " : message : " + err.getMessage());
+                 if (logger.isDebugEnabled()) {
+                     logger.warn("Warning stack dump", err);
+                 }
+                 throw err;
+             }
+         });
+ 
+         // build the docuemnt
 -        final Document document = builder.parse(xmlFile);
 -
 -        // ensure schema compliance
 -        final Validator validator = schema.newValidator();
 -        validator.validate(new DOMSource(document));
 -
++        final Document document = builder.parse(new 
ByteArrayInputStream(serialized));
+         return document;
+     }
+ 
+     private void addBulletin(final Node node, final Severity severity, final 
String msg) {
+         addBulletin(node.getNodeId(), severity, msg);
+     }
+ 
+     private void addBulletin(final NodeIdentifier nodeId, final Severity 
severity, final String msg) {
+         
bulletinRepository.addBulletin(BulletinFactory.createBulletin(BULLETIN_CATEGORY,
 severity.toString(),
+                 nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + 
msg));
+     }
+ 
+     /**
+      * Services a disconnection request.
+      *
+      * @param nodeId a node identifier
+      * @param userDn the DN of the user requesting the disconnection
+      *
+      * @throws UnknownNodeException if the node does not exist
+      * @throws IllegalNodeDisconnectionException if the node cannot be
+      * disconnected due to the cluster's state (e.g., node is last connected
+      * node or node is primary)
+      * @throws NodeDisconnectionException if the disconnection message fails 
to
+      * be sent.
+      */
+     @Override
+     public void requestDisconnection(final String nodeId, final String 
userDn) throws UnknownNodeException, IllegalNodeDisconnectionException, 
NodeDisconnectionException {
+         writeLock.lock();
+         try {
+             // check that the node is known
+             final Node node = getNode(nodeId);
+             if (node == null) {
+                 throw new UnknownNodeException("Node does not exist.");
+             }
+             requestDisconnection(node.getNodeId(), /* ignore last node */ 
false, "User " + userDn + " Disconnected Node");
+         } finally {
+             writeLock.unlock("requestDisconnection(String)");
+         }
+     }
+ 
+     /**
+      * Requests a disconnection to the node with the given node ID, but any
+      * exception thrown is suppressed.
+      *
+      * @param nodeId the node ID
+      */
+     private void requestDisconnectionQuietly(final NodeIdentifier nodeId, 
final String explanation) {
+         try {
+             requestDisconnection(nodeId, /* ignore node check */ true, 
explanation);
+         } catch (final IllegalNodeDisconnectionException | 
NodeDisconnectionException ex) { /* suppress exception */ }
+     }
+ 
+     /**
+      * Issues a disconnection message to the node identified by the given node
+      * ID. If the node is not known, then a UnknownNodeException is thrown. If
+      * the node cannot be disconnected due to the cluster's state and
+      * ignoreLastNodeCheck is false, then a IllegalNodeDisconnectionException 
is
+      * thrown. Otherwise, a disconnection message is issued to the node.
+      *
+      * Whether the disconnection message is successfully sent to the node, the
+      * node is marked as disconnected and if the node is the primary node, 
then
+      * the primary role is revoked.
+      *
+      * @param nodeId the ID of the node
+      * @param ignoreNodeChecks if false, checks will be made to ensure the
+      * cluster supports the node's disconnection (e.g., the node is not the 
last
+      * connected node in the cluster; the node is not the primary); otherwise,
+      * the request is made regardless of the cluster state
+      * @param explanation
+      *
+      * @throws IllegalNodeDisconnectionException if the node cannot be
+      * disconnected due to the cluster's state (e.g., node is last connected
+      * node or node is primary). Not thrown if ignoreNodeChecks is true.
+      * @throws NodeDisconnectionException if the disconnection message fails 
to
+      * be sent.
+      */
+     private void requestDisconnection(final NodeIdentifier nodeId, final 
boolean ignoreNodeChecks, final String explanation)
+             throws IllegalNodeDisconnectionException, 
NodeDisconnectionException {
+ 
+         writeLock.lock();
+         try {
+ 
+             // check that the node is known
+             final Node node = getRawNode(nodeId.getId());
+             if (node == null) {
+                 if (ignoreNodeChecks) {
+                     // issue the disconnection
+                     final DisconnectMessage request = new DisconnectMessage();
+                     request.setNodeId(nodeId);
+                     request.setExplanation(explanation);
+ 
+                     addEvent(nodeId, "Disconnection requested due to " + 
explanation);
+                     senderListener.disconnect(request);
+                     addEvent(nodeId, "Node disconnected due to " + 
explanation);
+                     addBulletin(nodeId, Severity.INFO, "Node disconnected due 
to " + explanation);
+                     return;
+                 } else {
+                     throw new UnknownNodeException("Node does not exist");
+                 }
+             }
+ 
+             // if necessary, check that the node may be disconnected
+             if (!ignoreNodeChecks) {
+                 final Set<NodeIdentifier> connectedNodes = 
getNodeIds(Status.CONNECTED);
+                 // cannot disconnect the last connected node in the cluster
+                 if (connectedNodes.size() == 1 && 
connectedNodes.iterator().next().equals(nodeId)) {
+                     throw new IllegalNodeDisconnectionException("Node may not 
be disconnected because it is the only connected node in the cluster.");
+                 } else if (isPrimaryNode(nodeId)) {
+                     // cannot disconnect the primary node in the cluster
+                     throw new IllegalNodeDisconnectionException("Node may not 
be disconnected because it is the primary node in the cluster.");
+                 }
+             }
+ 
+             // update status
+             node.setStatus(Status.DISCONNECTED);
+             notifyDataFlowManagementServiceOfNodeStatusChange();
+ 
+             // issue the disconnection
+             final DisconnectMessage request = new DisconnectMessage();
+             request.setNodeId(nodeId);
+             request.setExplanation(explanation);
+ 
+             addEvent(nodeId, "Disconnection requested due to " + explanation);
+             senderListener.disconnect(request);
+             addEvent(nodeId, "Node disconnected due to " + explanation);
+             addBulletin(node, Severity.INFO, "Node disconnected due to " + 
explanation);
+         } finally {
+             writeLock.unlock("requestDisconnection(NodeIdentifier, boolean)");
+         }
+     }
+ 
+     /**
+      * Messages the node to have the primary role. If the messaging fails, 
then
+      * the node is marked as disconnected.
+      *
+      * @param nodeId the node ID to assign primary role
+      *
+      * @return true if primary role assigned; false otherwise
+      */
+     private boolean assignPrimaryRole(final NodeIdentifier nodeId) {
+         writeLock.lock();
+         try {
+             // create primary role message
+             final PrimaryRoleAssignmentMessage msg = new 
PrimaryRoleAssignmentMessage();
+             msg.setNodeId(nodeId);
+             msg.setPrimary(true);
+             logger.info("Attempting to assign primary role to node: " + 
nodeId);
+ 
+             // message 
+             senderListener.assignPrimaryRole(msg);
+ 
+             logger.info("Assigned primary role to node: " + nodeId);
+             addBulletin(nodeId, Severity.INFO, "Node assigned primary role");
+ 
+             // true indicates primary role assigned
+             return true;
+ 
+         } catch (final ProtocolException ex) {
+ 
+             logger.warn("Failed attempt to assign primary role to node " + 
nodeId + " due to " + ex);
+             addBulletin(nodeId, Severity.ERROR, "Failed to assign primary 
role to node due to: " + ex);
+ 
+             // mark node as disconnected and log/record the event
+             final Node node = getRawNode(nodeId.getId());
+             node.setStatus(Status.DISCONNECTED);
+             addEvent(node.getNodeId(), "Disconnected because of failed 
attempt to assign primary role.");
+ 
+             addBulletin(nodeId, Severity.WARNING, "Node disconnected because 
of failed attempt to assign primary role");
+ 
+             // false indicates primary role failed to be assigned
+             return false;
+         } finally {
+             writeLock.unlock("assignPrimaryRole");
+         }
+     }
+ 
+     /**
+      * Messages the node with the given node ID to no longer have the primary
+      * role. If the messaging fails, then the node is marked as disconnected.
+      *
+      * @return true if the primary role was revoked from the node; false
+      * otherwise
+      */
+     private boolean revokePrimaryRole(final NodeIdentifier nodeId) {
+         writeLock.lock();
+         try {
+             // create primary role message
+             final PrimaryRoleAssignmentMessage msg = new 
PrimaryRoleAssignmentMessage();
+             msg.setNodeId(nodeId);
+             msg.setPrimary(false);
+             logger.info("Attempting to revoke primary role from node: " + 
nodeId);
+ 
+             // send message
+             senderListener.assignPrimaryRole(msg);
+ 
+             logger.info("Revoked primary role from node: " + nodeId);
+             addBulletin(nodeId, Severity.INFO, "Primary Role revoked from 
node");
+ 
+             // true indicates primary role was revoked
+             return true;
+         } catch (final ProtocolException ex) {
+ 
+             logger.warn("Failed attempt to revoke primary role from node " + 
nodeId + " due to " + ex);
+ 
+             // mark node as disconnected and log/record the event
+             final Node node = getRawNode(nodeId.getId());
+             node.setStatus(Status.DISCONNECTED);
+             addEvent(node.getNodeId(), "Disconnected because of failed 
attempt to revoke primary role.");
+             addBulletin(node, Severity.ERROR, "Node disconnected because of 
failed attempt to revoke primary role");
+ 
+             // false indicates primary role failed to be revoked
+             return false;
+         } finally {
+             writeLock.unlock("revokePrimaryRole");
+         }
+     }
+ 
+     private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final 
String dn) {
+         return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(),
+                 nodeId.getApiPort(), nodeId.getSocketAddress(), 
nodeId.getSocketPort(), dn);
+     }
+ 
+     private ConnectionResponseMessage handleConnectionRequest(final 
ConnectionRequestMessage requestMessage) {
+         final NodeIdentifier proposedIdentifier = 
requestMessage.getConnectionRequest().getProposedNodeIdentifier();
+         final ConnectionRequest requestWithDn = new 
ConnectionRequest(addRequestorDn(proposedIdentifier, 
requestMessage.getRequestorDN()));
+ 
+         final ConnectionResponse response = requestConnection(requestWithDn);
+         final ConnectionResponseMessage responseMessage = new 
ConnectionResponseMessage();
+         responseMessage.setConnectionResponse(response);
+         return responseMessage;
+     }
+ 
+     private void handleControllerStartupFailure(final 
ControllerStartupFailureMessage msg) {
+         writeLock.lock();
+         try {
+             final Node node = getRawNode(msg.getNodeId().getId());
+             if (node != null) {
+                 node.setStatus(Status.DISCONNECTED);
+                 addEvent(msg.getNodeId(), "Node could not join cluster 
because it failed to start up properly. Setting node to Disconnected. Node 
reported the following error: " + msg.getExceptionMessage());
+                 addBulletin(node, Severity.ERROR, "Node could not join 
cluster because it failed to start up properly. Setting node to Disconnected. 
Node reported the following error: " + msg.getExceptionMessage());
+             }
+         } finally {
+             writeLock.unlock("handleControllerStartupFailure");
+         }
+     }
+ 
+     private void handleReconnectionFailure(final ReconnectionFailureMessage 
msg) {
+         writeLock.lock();
+         try {
+             final Node node = getRawNode(msg.getNodeId().getId());
+             if (node != null) {
+                 node.setStatus(Status.DISCONNECTED);
+                 final String errorMsg = "Node could not rejoin cluster. 
Setting node to Disconnected. Node reported the following error: " + 
msg.getExceptionMessage();
+                 addEvent(msg.getNodeId(), errorMsg);
+                 addBulletin(node, Severity.ERROR, errorMsg);
+             }
+         } finally {
+             writeLock.unlock("handleControllerStartupFailure");
+         }
+     }
 -
++    
++    /**
++     * Adds an instance of a specified controller service.
++     *
++     * @param type
++     * @param id
++     * @param properties
++     * @return
++     */
++    @Override
++    public ControllerServiceNode createControllerService(final String type, 
final String id, final boolean firstTimeAdded) {
++      return controllerServiceProvider.createControllerService(type, id, 
firstTimeAdded);
++    }
+ 
+     @Override
+     public ControllerService getControllerService(String serviceIdentifier) {
+         return 
controllerServiceProvider.getControllerService(serviceIdentifier);
+     }
+ 
+     @Override
+     public ControllerServiceNode getControllerServiceNode(final String id) {
+         return controllerServiceProvider.getControllerServiceNode(id);
+     }
+ 
+     @Override
+     public boolean isControllerServiceEnabled(final ControllerService 
service) {
+         return controllerServiceProvider.isControllerServiceEnabled(service);
+     }
+ 
+     @Override
+     public boolean isControllerServiceEnabled(final String serviceIdentifier) 
{
+         return 
controllerServiceProvider.isControllerServiceEnabled(serviceIdentifier);
+     }
+ 
+     @Override
 -    public ControllerServiceNode createControllerService(final String type, 
final String id, final boolean firstTimeAdded) {
 -        return controllerServiceProvider.createControllerService(type, id, 
firstTimeAdded);
++    public String getControllerServiceName(final String serviceIdentifier) {
++      return 
controllerServiceProvider.getControllerServiceName(serviceIdentifier);
+     }
 -    
++
+     @Override
+     public void removeControllerService(final ControllerServiceNode 
serviceNode) {
+         controllerServiceProvider.removeControllerService(serviceNode);
+     }
+     
+ 
+     @Override
+     public void enableControllerService(final ControllerServiceNode 
serviceNode) {
+         controllerServiceProvider.enableControllerService(serviceNode);
+     }
+     
+     @Override
+     public void disableControllerService(final ControllerServiceNode 
serviceNode) {
+         controllerServiceProvider.disableControllerService(serviceNode);
+     }
+     
++    @Override
++    public Set<ControllerServiceNode> getAllControllerServices() {
++      return controllerServiceProvider.getAllControllerServices();
++    }
++    
++    
++    @Override
++    public void disableReferencingServices(final ControllerServiceNode 
serviceNode) {
++        controllerServiceProvider.disableReferencingServices(serviceNode);
++    }
++    
++    @Override
++    public void enableReferencingServices(final ControllerServiceNode 
serviceNode) {
++        controllerServiceProvider.enableReferencingServices(serviceNode);
++    }
++    
++    @Override
++    public void scheduleReferencingComponents(final ControllerServiceNode 
serviceNode) {
++        controllerServiceProvider.scheduleReferencingComponents(serviceNode);
++    }
++    
++    @Override
++    public void unscheduleReferencingComponents(final ControllerServiceNode 
serviceNode) {
++        
controllerServiceProvider.unscheduleReferencingComponents(serviceNode);
++    }
++    
++    @Override
++    public void verifyCanEnableReferencingServices(final 
ControllerServiceNode serviceNode) {
++        
controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode);
++    }
++    
++    @Override
++    public void verifyCanScheduleReferencingComponents(final 
ControllerServiceNode serviceNode) {
++        
controllerServiceProvider.verifyCanScheduleReferencingComponents(serviceNode);
++    }
++    
++    @Override
++    public void verifyCanDisableReferencingServices(final 
ControllerServiceNode serviceNode) {
++        
controllerServiceProvider.verifyCanDisableReferencingServices(serviceNode);
++    }
++    
++    @Override
++    public void verifyCanStopReferencingComponents(final 
ControllerServiceNode serviceNode) {
++        
controllerServiceProvider.verifyCanStopReferencingComponents(serviceNode);
++    }
++    
++    private byte[] serialize(final Document doc) throws TransformerException {
++      final ByteArrayOutputStream baos = new ByteArrayOutputStream();
++      final DOMSource domSource = new DOMSource(doc);
++        final StreamResult streamResult = new StreamResult(baos);
++
++        // configure the transformer and convert the DOM
++        final TransformerFactory transformFactory = 
TransformerFactory.newInstance();
++        final Transformer transformer = transformFactory.newTransformer();
++        
transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount";, "2");
++        transformer.setOutputProperty(OutputKeys.INDENT, "yes");
++
++        // transform the document to byte stream
++        transformer.transform(domSource, streamResult);
++        return baos.toByteArray();
++    }
++    
++    private byte[] serializeControllerServices() throws 
ParserConfigurationException, TransformerException {
++      final DocumentBuilderFactory docFactory = 
DocumentBuilderFactory.newInstance();
++        final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
++        final Document document = docBuilder.newDocument();
++      final Element rootElement = 
document.createElement("controllerServices");
++      document.appendChild(rootElement);
++      
++      for ( final ControllerServiceNode serviceNode : 
getAllControllerServices() ) {
++              StandardFlowSerializer.addControllerService(rootElement, 
serviceNode, encryptor);
++      }
++      
++      return serialize(document);
++    }
++    
++    private byte[] serializeReportingTasks() throws 
ParserConfigurationException, TransformerException {
++      final DocumentBuilderFactory docFactory = 
DocumentBuilderFactory.newInstance();
++        final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
++        final Document document = docBuilder.newDocument();
++      final Element rootElement = document.createElement("reportingTasks");
++      document.appendChild(rootElement);
++      
++      for ( final ReportingTaskNode taskNode : getAllReportingTasks() ) {
++              StandardFlowSerializer.addReportingTask(rootElement, taskNode, 
encryptor);
++      }
++      
++      return serialize(document);
++    }
++    
++    
++    public void saveControllerServices() {
++      try {
++              
dataFlowManagementService.updateControllerServices(serializeControllerServices());
++      } catch (final Exception e) {
++              logger.error("Failed to save changes to NCM's Controller 
Services; changes may be lost on restart due to " + e);
++              if ( logger.isDebugEnabled() ) {
++                      logger.error("", e);
++              }
++              
++              
getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Controller 
Services", Severity.ERROR.name(), 
++                              "Failed to save changes to NCM's Controller 
Services; changes may be lost on restart. See logs for more details."));
++      }
++    }
++    
++    public void saveReportingTasks() {
++      try {
++              
dataFlowManagementService.updateReportingTasks(serializeReportingTasks());
++      } catch (final Exception e) {
++              logger.error("Failed to save changes to NCM's Reporting Tasks; 
changes may be lost on restart due to " + e);
++              if ( logger.isDebugEnabled() ) {
++                      logger.error("", e);
++              }
++              
++              
getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Reporting 
Tasks", Severity.ERROR.name(), 
++                              "Failed to save changes to NCM's Reporting 
Tasks; changes may be lost on restart. See logs for more details."));
++      }
++    }
++
++    @Override
++    public Set<ReportingTaskNode> getAllReportingTasks() {
++      readLock.lock();
++      try {
++              return new HashSet<>(reportingTasks.values());
++      } finally {
++              readLock.unlock("getReportingTasks");
++      }
++    }
++
++    @Override
++    public ReportingTaskNode getReportingTaskNode(final String taskId) {
++      readLock.lock();
++      try {
++              return reportingTasks.get(taskId);
++      } finally {
++              readLock.unlock("getReportingTaskNode");
++      }
++    }
++
++    @Override
++    public void startReportingTask(final ReportingTaskNode reportingTaskNode) 
{
++        reportingTaskNode.verifyCanStart();
++              processScheduler.schedule(reportingTaskNode);
++    }
++
++    
++    @Override
++    public void stopReportingTask(final ReportingTaskNode reportingTaskNode) {
++        reportingTaskNode.verifyCanStop();
++        processScheduler.unschedule(reportingTaskNode);
++    }
++
++    @Override
++    public void removeReportingTask(final ReportingTaskNode 
reportingTaskNode) {
++      writeLock.lock();
++      try {
++              final ReportingTaskNode existing = 
reportingTasks.get(reportingTaskNode.getIdentifier());
++              if ( existing == null || existing != reportingTaskNode ) {
++                  throw new IllegalStateException("Reporting Task " + 
reportingTaskNode + " does not exist in this Flow");
++              }
++              
++              reportingTaskNode.verifyCanDelete();
++              
++              try (final NarCloseable x = NarCloseable.withNarLoader()) {
++                  
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, 
reportingTaskNode.getReportingTask(), 
reportingTaskNode.getConfigurationContext());
++              }
++              
++              for ( final Map.Entry<PropertyDescriptor, String> entry : 
reportingTaskNode.getProperties().entrySet() ) {
++                  final PropertyDescriptor descriptor = entry.getKey();
++                  if (descriptor.getControllerServiceDefinition() != null ) {
++                      final String value = entry.getValue() == null ? 
descriptor.getDefaultValue() : entry.getValue();
++                      if ( value != null ) {
++                          final ControllerServiceNode serviceNode = 
controllerServiceProvider.getControllerServiceNode(value);
++                          if ( serviceNode != null ) {
++                              serviceNode.removeReference(reportingTaskNode);
++                          }
++                      }
++                  }
++              }
++              
++              reportingTasks.remove(reportingTaskNode.getIdentifier());
++      } finally {
++              writeLock.unlock("removeReportingTask");
++      }
++    }
++    
++    
++    @Override
++    public void disableReportingTask(final ReportingTaskNode reportingTask) {
++        reportingTask.verifyCanDisable();
++        processScheduler.disableReportingTask(reportingTask);
++    }
++    
++    @Override
++    public void enableReportingTask(final ReportingTaskNode reportingTask) {
++        reportingTask.verifyCanEnable();
++        processScheduler.enableReportingTask(reportingTask);
++    }
++    
+     
+     /**
+      * Handle a bulletins message.
+      *
+      * @param bulletins
+      */
+     public void handleBulletins(final NodeBulletins bulletins) {
+         final NodeIdentifier nodeIdentifier = bulletins.getNodeIdentifier();
+         final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + 
nodeIdentifier.getApiPort();
+ 
+         // unmarshal the message
+         BulletinsPayload payload = 
BulletinsPayload.unmarshal(bulletins.getPayload());
+         for (final Bulletin bulletin : payload.getBulletins()) {
+             bulletin.setNodeAddress(nodeAddress);
+             bulletinRepository.addBulletin(bulletin);
+         }
+     }
+ 
+     /**
+      * Handles a node's heartbeat. If this heartbeat is a node's first 
heartbeat
+      * since its connection request, then the manager will mark the node as
+      * connected. If the node was previously disconnected due to a lack of
+      * heartbeat, then a reconnection request is issued. If the node was
+      * disconnected for other reasons, then a disconnection request is issued.
+      * If this instance is configured with a firewall and the heartbeat is
+      * blocked, then a disconnection request is issued.
+      *
+      * @param heartbeat
+      */
+     @Override
+     public void handleHeartbeat(final Heartbeat heartbeat) {
+         // sanity check heartbeat
+         if (heartbeat == null) {
+             throw new IllegalArgumentException("Heartbeat may not be null.");
+         } else if (heartbeat.getNodeIdentifier() == null) {
+             throw new IllegalArgumentException("Heartbeat does not contain a 
node ID.");
+         }
+ 
+         /*
+          * Processing a heartbeat requires a write lock, which may take a 
while
+          * to obtain.  Only the last heartbeat is necessary to process per 
node.
+          * Futhermore, since many could pile up, heartbeats are processed in 
+          * bulk.
+          * 
+          * The below queue stores the pending heartbeats.
+          */
+         pendingHeartbeats.add(heartbeat);
+     }
+ 
+     private void processPendingHeartbeats() {
+         Node node;
+ 
+         writeLock.lock();
+         try {
+             /*
+              * Get the most recent heartbeats for the nodes in the cluster.  
This
+              * is achieved by "draining" the pending heartbeats queue, 
populating
+              * a map that associates a node identifier with its latest 
heartbeat, and
+              * finally, getting the values of the map.
+              */
+             final Map<NodeIdentifier, Heartbeat> mostRecentHeartbeatsMap = 
new HashMap<>();
+             Heartbeat aHeartbeat;
+             while ((aHeartbeat = pendingHeartbeats.poll()) != null) {
+                 mostRecentHeartbeatsMap.put(aHeartbeat.getNodeIdentifier(), 
aHeartbeat);
+             }
+             final Collection<Heartbeat> mostRecentHeartbeats = new 
ArrayList<>(mostRecentHeartbeatsMap.values());
+ 
+             // return fast if no work to do
+             if (mostRecentHeartbeats.isEmpty()) {
+                 return;
+             }
+ 
+             logNodes("Before Heartbeat Processing", heartbeatLogger);
+ 
+             final int numPendingHeartbeats = mostRecentHeartbeats.size();
+             if (heartbeatLogger.isDebugEnabled()) {
+                 heartbeatLogger.debug(String.format("Handling %s 
heartbeat%s", numPendingHeartbeats, (numPendingHeartbeats > 1) ? "s" : ""));
+             }
+ 
+             for (final Heartbeat mostRecentHeartbeat : mostRecentHeartbeats) {
+                 try {
+                     // resolve the proposed node identifier to valid node 
identifier
+                     final NodeIdentifier resolvedNodeIdentifier = 
resolveProposedNodeIdentifier(mostRecentHeartbeat.getNodeIdentifier());
+ 
+                     // get a raw reference to the node (if it doesn't exist, 
node will be null)
+                     node = getRawNode(resolvedNodeIdentifier.getId());
+ 
+                     // if the node thinks it has the primary role, but the 
manager has assigned the role to a different node, then revoke the role
+                     if (mostRecentHeartbeat.isPrimary() && 
!isPrimaryNode(resolvedNodeIdentifier)) {
+                         addEvent(resolvedNodeIdentifier, "Heartbeat indicates 
node is running as primary node.  Revoking primary role because primary role is 
assigned to a different node.");
+                         revokePrimaryRole(resolvedNodeIdentifier);
+                     }
+ 
+                     final boolean heartbeatIndicatesNotYetConnected = 
!mostRecentHeartbeat.isConnected();
+ 
+                     if 
(isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
+                         if (node == null) {
+                             logger.info("Firewall blocked heartbeat received 
from unknown node " + resolvedNodeIdentifier + ".  Issuing disconnection 
request.");
+                         } else {
+                             // record event
+                             addEvent(resolvedNodeIdentifier, "Firewall 
blocked received heartbeat.  Issuing disconnection request.");
+                         }
+ 
+                         // request node

<TRUNCATED>

Reply via email to