http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 0000000,346e801..1b7a3c0
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@@ -1,0 -1,3579 +1,3643 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.controller;
+ 
+ import static java.util.Objects.requireNonNull;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.text.DateFormat;
+ import java.text.SimpleDateFormat;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Date;
+ import java.util.HashSet;
+ import java.util.LinkedHashSet;
+ import java.util.List;
+ import java.util.Locale;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.ScheduledExecutorService;
+ import java.util.concurrent.ScheduledFuture;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
+ 
+ import javax.net.ssl.SSLContext;
+ 
+ import org.apache.nifi.admin.service.UserService;
++import org.apache.nifi.annotation.lifecycle.OnAdded;
++import org.apache.nifi.annotation.lifecycle.OnRemoved;
+ import org.apache.nifi.cluster.BulletinsPayload;
+ import org.apache.nifi.cluster.HeartbeatPayload;
+ import org.apache.nifi.cluster.protocol.DataFlow;
+ import org.apache.nifi.cluster.protocol.Heartbeat;
+ import org.apache.nifi.cluster.protocol.NodeBulletins;
+ import org.apache.nifi.cluster.protocol.NodeIdentifier;
+ import org.apache.nifi.cluster.protocol.NodeProtocolSender;
+ import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
+ import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+ import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
+ import org.apache.nifi.connectable.Connectable;
+ import org.apache.nifi.connectable.ConnectableType;
+ import org.apache.nifi.connectable.Connection;
+ import org.apache.nifi.connectable.Funnel;
+ import org.apache.nifi.connectable.LocalPort;
+ import org.apache.nifi.connectable.Port;
+ import org.apache.nifi.connectable.Position;
+ import org.apache.nifi.connectable.Size;
+ import org.apache.nifi.connectable.StandardConnection;
+ import org.apache.nifi.controller.exception.CommunicationsException;
+ import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+ import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
+ import org.apache.nifi.controller.label.Label;
+ import org.apache.nifi.controller.label.StandardLabel;
+ import 
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+ import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
+ import org.apache.nifi.controller.repository.ContentRepository;
+ import org.apache.nifi.controller.repository.CounterRepository;
+ import org.apache.nifi.controller.repository.FlowFileEvent;
+ import org.apache.nifi.controller.repository.FlowFileEventRepository;
+ import org.apache.nifi.controller.repository.FlowFileRecord;
+ import org.apache.nifi.controller.repository.FlowFileRepository;
+ import org.apache.nifi.controller.repository.FlowFileSwapManager;
+ import org.apache.nifi.controller.repository.QueueProvider;
+ import org.apache.nifi.controller.repository.RepositoryRecord;
+ import org.apache.nifi.controller.repository.RepositoryStatusReport;
+ import org.apache.nifi.controller.repository.StandardCounterRepository;
+ import org.apache.nifi.controller.repository.StandardFlowFileRecord;
+ import org.apache.nifi.controller.repository.StandardRepositoryRecord;
+ import org.apache.nifi.controller.repository.claim.ContentClaim;
+ import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+ import org.apache.nifi.controller.repository.claim.ContentDirection;
+ import 
org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+ import org.apache.nifi.controller.repository.io.LimitedInputStream;
+ import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent;
+ import org.apache.nifi.controller.scheduling.ProcessContextFactory;
+ import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
+ import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
+ import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
+ import org.apache.nifi.controller.service.ControllerServiceNode;
+ import org.apache.nifi.controller.service.ControllerServiceProvider;
+ import org.apache.nifi.controller.service.StandardControllerServiceProvider;
+ import org.apache.nifi.controller.status.ConnectionStatus;
+ import org.apache.nifi.controller.status.PortStatus;
+ import org.apache.nifi.controller.status.ProcessGroupStatus;
+ import org.apache.nifi.controller.status.ProcessorStatus;
+ import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+ import org.apache.nifi.controller.status.RunStatus;
+ import org.apache.nifi.controller.status.TransmissionStatus;
+ import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+ import org.apache.nifi.controller.status.history.StatusHistoryUtil;
+ import org.apache.nifi.controller.tasks.ExpireFlowFiles;
+ import org.apache.nifi.diagnostics.SystemDiagnostics;
+ import org.apache.nifi.diagnostics.SystemDiagnosticsFactory;
+ import org.apache.nifi.encrypt.StringEncryptor;
+ import org.apache.nifi.engine.FlowEngine;
+ import org.apache.nifi.events.BulletinFactory;
+ import org.apache.nifi.events.EventReporter;
+ import org.apache.nifi.events.NodeBulletinProcessingStrategy;
+ import org.apache.nifi.events.VolatileBulletinRepository;
+ import org.apache.nifi.flowfile.FlowFilePrioritizer;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.framework.security.util.SslContextFactory;
+ import org.apache.nifi.groups.ProcessGroup;
+ import org.apache.nifi.groups.RemoteProcessGroup;
+ import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
+ import org.apache.nifi.groups.StandardProcessGroup;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.logging.LogLevel;
+ import org.apache.nifi.logging.LogRepository;
+ import org.apache.nifi.logging.LogRepositoryFactory;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.logging.ProcessorLogObserver;
+ import org.apache.nifi.nar.ExtensionManager;
++import org.apache.nifi.nar.NarClassLoader;
+ import org.apache.nifi.nar.NarCloseable;
+ import org.apache.nifi.nar.NarThreadContextClassLoader;
+ import org.apache.nifi.processor.Processor;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.QueueSize;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.SimpleProcessLogger;
+ import org.apache.nifi.processor.StandardProcessorInitializationContext;
+ import org.apache.nifi.processor.StandardValidationContextFactory;
 -import org.apache.nifi.processor.annotation.OnAdded;
+ import org.apache.nifi.provenance.ProvenanceEventRecord;
+ import org.apache.nifi.provenance.ProvenanceEventRepository;
+ import org.apache.nifi.provenance.ProvenanceEventType;
+ import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+ import org.apache.nifi.remote.RemoteGroupPort;
+ import org.apache.nifi.remote.RemoteResourceManager;
+ import org.apache.nifi.remote.RemoteSiteListener;
+ import org.apache.nifi.remote.RootGroupPort;
+ import org.apache.nifi.remote.SocketRemoteSiteListener;
+ import org.apache.nifi.remote.StandardRemoteProcessGroup;
+ import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
+ import org.apache.nifi.remote.StandardRootGroupPort;
+ import org.apache.nifi.remote.TransferDirection;
+ import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol;
+ import org.apache.nifi.reporting.Bulletin;
+ import org.apache.nifi.reporting.BulletinRepository;
+ import org.apache.nifi.reporting.EventAccess;
+ import org.apache.nifi.reporting.ReportingTask;
+ import org.apache.nifi.reporting.Severity;
+ import org.apache.nifi.scheduling.SchedulingStrategy;
+ import org.apache.nifi.util.FormatUtils;
+ import org.apache.nifi.util.NiFiProperties;
+ import org.apache.nifi.util.ReflectionUtils;
+ import org.apache.nifi.web.api.dto.ConnectableDTO;
+ import org.apache.nifi.web.api.dto.ConnectionDTO;
+ import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+ import org.apache.nifi.web.api.dto.FunnelDTO;
+ import org.apache.nifi.web.api.dto.LabelDTO;
+ import org.apache.nifi.web.api.dto.PortDTO;
+ import org.apache.nifi.web.api.dto.PositionDTO;
+ import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+ import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
+ import org.apache.nifi.web.api.dto.ProcessorDTO;
+ import org.apache.nifi.web.api.dto.RelationshipDTO;
+ import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
+ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
+ import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+ import org.apache.nifi.web.api.dto.TemplateDTO;
+ import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+ import org.apache.commons.lang3.StringUtils;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import com.sun.jersey.api.client.ClientHandlerException;
+ 
+ public class FlowController implements EventAccess, 
ControllerServiceProvider, Heartbeater, QueueProvider {
+ 
+     // default repository implementations
+     public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = 
"org.apache.nifi.controller.repository.WriteAheadFlowFileRepository";
+     public static final String DEFAULT_CONTENT_REPO_IMPLEMENTATION = 
"org.apache.nifi.controller.repository.FileSystemRepository";
+     public static final String DEFAULT_PROVENANCE_REPO_IMPLEMENTATION = 
"org.apache.nifi.provenance.VolatileProvenanceRepository";
+     public static final String DEFAULT_SWAP_MANAGER_IMPLEMENTATION = 
"org.apache.nifi.controller.FileSystemSwapManager";
+     public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = 
"org.apache.nifi.controller.status.history.VolatileComponentStatusRepository";
+ 
+     public static final String SCHEDULE_MINIMUM_NANOSECONDS = 
"flowcontroller.minimum.nanoseconds";
+     public static final String GRACEFUL_SHUTDOWN_PERIOD = 
"nifi.flowcontroller.graceful.shutdown.seconds";
+     public static final long DEFAULT_GRACEFUL_SHUTDOWN_SECONDS = 10;
+     public static final int METRICS_RESERVOIR_SIZE = 288;   // 1 day worth of 
5-minute captures
+ 
+     public static final String ROOT_GROUP_ID_ALIAS = "root";
+     public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow";
+ 
+     private final AtomicInteger maxTimerDrivenThreads;
+     private final AtomicInteger maxEventDrivenThreads;
+     private final AtomicReference<FlowEngine> timerDrivenEngineRef;
+     private final AtomicReference<FlowEngine> eventDrivenEngineRef;
+ 
+     private final ContentRepository contentRepository;
+     private final FlowFileRepository flowFileRepository;
+     private final FlowFileEventRepository flowFileEventRepository;
+     private final ProvenanceEventRepository provenanceEventRepository;
+     private final VolatileBulletinRepository bulletinRepository;
+     private final StandardProcessScheduler processScheduler;
+     private final TemplateManager templateManager;
+     private final SnippetManager snippetManager;
+     private final long gracefulShutdownSeconds;
+     private final ExtensionManager extensionManager;
+     private final NiFiProperties properties;
+     private final SSLContext sslContext;
+     private final RemoteSiteListener externalSiteListener;
+     private final AtomicReference<CounterRepository> counterRepositoryRef;
+     private final AtomicBoolean initialized = new AtomicBoolean(false);
+     private final ControllerServiceProvider controllerServiceProvider;
+     private final UserService userService;
+     private final EventDrivenWorkerQueue eventDrivenWorkerQueue;
+     private final ComponentStatusRepository componentStatusRepository;
+     private final long systemStartTime = System.currentTimeMillis();    // 
time at which the node was started
+     private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = 
new ConcurrentHashMap<>();
+ 
+     // The Heartbeat Bean is used to provide an Atomic Reference to data that 
is used in heartbeats that may
+     // change while the instance is running. We do this because we want to 
generate heartbeats even if we
+     // are unable to obtain a read lock on the entire FlowController.
+     private final AtomicReference<HeartbeatBean> heartbeatBeanRef = new 
AtomicReference<>();
+     private final AtomicBoolean heartbeatsSuspended = new 
AtomicBoolean(false);
+ 
+     private final Integer remoteInputSocketPort;
+     private final Boolean isSiteToSiteSecure;
+     private Integer clusterManagerRemoteSitePort = null;
+     private Boolean clusterManagerRemoteSiteCommsSecure = null;
+ 
+     private ProcessGroup rootGroup;
+     private final List<Connectable> startConnectablesAfterInitialization;
+     private final List<RemoteGroupPort> 
startRemoteGroupPortsAfterInitialization;
+ 
+     /**
+      * true if controller is configured to operate in a clustered environment
+      */
+     private final boolean configuredForClustering;
+ 
+     /**
+      * the time to wait between heartbeats
+      */
+     private final int heartbeatDelaySeconds;
+ 
+     /**
+      * The sensitive property string encryptor *
+      */
+     private final StringEncryptor encryptor;
+ 
+     /**
+      * cluster protocol sender
+      */
+     private final NodeProtocolSender protocolSender;
+ 
+     private final ScheduledExecutorService clusterTaskExecutor = new 
FlowEngine(3, "Clustering Tasks");
+     private final ContentClaimManager contentClaimManager = new 
StandardContentClaimManager();
+ 
+     // guarded by rwLock
+     /**
+      * timer to periodically send heartbeats to the cluster
+      */
+     private ScheduledFuture<?> bulletinFuture;
+     private ScheduledFuture<?> heartbeatGeneratorFuture;
+     private ScheduledFuture<?> heartbeatSenderFuture;
+ 
+     // guarded by FlowController lock
+     /**
+      * timer task to generate heartbeats
+      */
+     private final AtomicReference<HeartbeatMessageGeneratorTask> 
heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null);
+ 
+     private AtomicReference<NodeBulletinProcessingStrategy> 
nodeBulletinSubscriber;
+ 
+     // guarded by rwLock
+     /**
+      * the node identifier;
+      */
+     private NodeIdentifier nodeId;
+ 
+     // guarded by rwLock
+     /**
+      * true if controller is connected or trying to connect to the cluster
+      */
+     private boolean clustered;
+     private String clusterManagerDN;
+ 
+     // guarded by rwLock
+     /**
+      * true if controller is the primary of the cluster
+      */
+     private boolean primary;
+ 
+     // guarded by rwLock
+     /**
+      * true if connected to a cluster
+      */
+     private boolean connected;
+ 
+     // guarded by rwLock
+     private String instanceId;
+ 
+     private volatile boolean shutdown = false;
+ 
+     private final ReentrantReadWriteLock rwLock = new 
ReentrantReadWriteLock();
+     private final Lock readLock = rwLock.readLock();
+     private final Lock writeLock = rwLock.writeLock();
+ 
+     private FlowFileSwapManager flowFileSwapManager;    // guarded by 
read/write lock
+ 
+     private static final Logger LOG = 
LoggerFactory.getLogger(FlowController.class);
+     private static final Logger heartbeatLogger = 
LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat");
+ 
+     public static FlowController createStandaloneInstance(
+             final FlowFileEventRepository flowFileEventRepo,
+             final NiFiProperties properties,
+             final UserService userService,
+             final StringEncryptor encryptor) {
+         return new FlowController(
+                 flowFileEventRepo,
+                 properties,
+                 userService,
+                 encryptor,
+                 /* configuredForClustering */ false,
+                 /* NodeProtocolSender */ null);
+     }
+ 
+     public static FlowController createClusteredInstance(
+             final FlowFileEventRepository flowFileEventRepo,
+             final NiFiProperties properties,
+             final UserService userService,
+             final StringEncryptor encryptor,
+             final NodeProtocolSender protocolSender) {
+         final FlowController flowController = new FlowController(
+                 flowFileEventRepo,
+                 properties,
+                 userService,
+                 encryptor,
+                 /* configuredForClustering */ true,
+                 /* NodeProtocolSender */ protocolSender);
+ 
+         
flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), 
properties.isSiteToSiteSecure());
+ 
+         return flowController;
+     }
+ 
+     private FlowController(
+             final FlowFileEventRepository flowFileEventRepo,
+             final NiFiProperties properties,
+             final UserService userService,
+             final StringEncryptor encryptor,
+             final boolean configuredForClustering,
+             final NodeProtocolSender protocolSender) {
+ 
+         maxTimerDrivenThreads = new AtomicInteger(10);
+         maxEventDrivenThreads = new AtomicInteger(5);
+ 
+         this.encryptor = encryptor;
+         this.properties = properties;
+         sslContext = SslContextFactory.createSslContext(properties, false);
+         extensionManager = new ExtensionManager();
+         controllerServiceProvider = new StandardControllerServiceProvider();
+ 
+         timerDrivenEngineRef = new AtomicReference<>(new 
FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
+         eventDrivenEngineRef = new AtomicReference<>(new 
FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process"));
+ 
+         final FlowFileRepository flowFileRepo = 
createFlowFileRepository(properties, contentClaimManager);
+         flowFileRepository = flowFileRepo;
+         flowFileEventRepository = flowFileEventRepo;
+         counterRepositoryRef = new AtomicReference<CounterRepository>(new 
StandardCounterRepository());
+ 
+         bulletinRepository = new VolatileBulletinRepository();
+         nodeBulletinSubscriber = new AtomicReference<>();
+ 
+         try {
+             this.provenanceEventRepository = 
createProvenanceRepository(properties);
+             
this.provenanceEventRepository.initialize(createEventReporter(bulletinRepository));
+ 
+             this.contentRepository = createContentRepository(properties);
+         } catch (final Exception e) {
+             throw new RuntimeException("Unable to create Provenance 
Repository", e);
+         }
+ 
+         processScheduler = new StandardProcessScheduler(this, this, 
encryptor);
+         eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, 
processScheduler);
+ 
+         final ProcessContextFactory contextFactory = new 
ProcessContextFactory(contentRepository, flowFileRepository, 
flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository);
+         processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, 
new EventDrivenSchedulingAgent(
+                 eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, 
contextFactory, maxEventDrivenThreads.get(), encryptor));
+ 
+         final QuartzSchedulingAgent quartzSchedulingAgent = new 
QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, 
encryptor);
+         final TimerDrivenSchedulingAgent timerDrivenAgent = new 
TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, 
encryptor);
+         processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, 
timerDrivenAgent);
+         
processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, 
timerDrivenAgent);
+         processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, 
quartzSchedulingAgent);
+         processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, 
contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS);
+ 
+         startConnectablesAfterInitialization = new ArrayList<>();
+         startRemoteGroupPortsAfterInitialization = new ArrayList<>();
+         this.userService = userService;
+ 
+         final String gracefulShutdownSecondsVal = 
properties.getProperty(GRACEFUL_SHUTDOWN_PERIOD);
+         long shutdownSecs;
+         try {
+             shutdownSecs = Long.parseLong(gracefulShutdownSecondsVal);
+             if (shutdownSecs < 1) {
+                 shutdownSecs = DEFAULT_GRACEFUL_SHUTDOWN_SECONDS;
+             }
+         } catch (final NumberFormatException nfe) {
+             shutdownSecs = DEFAULT_GRACEFUL_SHUTDOWN_SECONDS;
+         }
+         gracefulShutdownSeconds = shutdownSecs;
+ 
+         remoteInputSocketPort = properties.getRemoteInputPort();
+         isSiteToSiteSecure = properties.isSiteToSiteSecure();
+ 
+         if (isSiteToSiteSecure && sslContext == null && remoteInputSocketPort 
!= null) {
+             throw new IllegalStateException("NiFi Configured to allow Secure 
Site-to-Site communications but the Keystore/Truststore properties are not 
configured");
+         }
+ 
+         this.configuredForClustering = configuredForClustering;
+         this.heartbeatDelaySeconds = (int) 
FormatUtils.getTimeDuration(properties.getNodeHeartbeatInterval(), 
TimeUnit.SECONDS);
+         this.protocolSender = protocolSender;
+         try {
+             this.templateManager = new 
TemplateManager(properties.getTemplateDirectory());
+         } catch (IOException e) {
+             throw new RuntimeException(e);
+         }
+ 
+         this.snippetManager = new SnippetManager();
+ 
+         rootGroup = new StandardProcessGroup(UUID.randomUUID().toString(), 
this, processScheduler, properties, encryptor);
+         rootGroup.setName(DEFAULT_ROOT_GROUP_NAME);
+         instanceId = UUID.randomUUID().toString();
+ 
+         if (remoteInputSocketPort == null){
+             LOG.info("Not enabling Site-to-Site functionality because 
nifi.remote.input.socket.port is not set");
+             externalSiteListener = null;
+         } else if (isSiteToSiteSecure && sslContext == null) {
+             LOG.error("Unable to create Secure Site-to-Site Listener because 
not all required Keystore/Truststore Properties are set. Site-to-Site 
functionality will be disabled until this problem is has been fixed.");
+             externalSiteListener = null;
+         } else {
+             // Register the SocketFlowFileServerProtocol as the appropriate 
resource for site-to-site Server Protocol
+             
RemoteResourceManager.setServerProtocolImplementation(SocketFlowFileServerProtocol.RESOURCE_NAME,
 SocketFlowFileServerProtocol.class);
+             externalSiteListener = new 
SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext 
: null);
+             externalSiteListener.setRootGroup(rootGroup);
+         }
+ 
+         // Determine frequency for obtaining component status snapshots
+         final String snapshotFrequency = 
properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, 
NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY);
+         long snapshotMillis;
+         try {
+             snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, 
TimeUnit.MILLISECONDS);
+         } catch (final Exception e) {
+             snapshotMillis = 
FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY,
 TimeUnit.MILLISECONDS);
+         }
+ 
+         componentStatusRepository = createComponentStatusRepository();
+         timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
+             @Override
+             public void run() {
+                 componentStatusRepository.capture(getControllerStatus());
+             }
+         }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
+ 
+         heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false));
+     }
+ 
+     private static FlowFileRepository createFlowFileRepository(final 
NiFiProperties properties, final ContentClaimManager contentClaimManager) {
+         final String implementationClassName = 
properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, 
DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
+         if (implementationClassName == null) {
+             throw new RuntimeException("Cannot create FlowFile Repository 
because the NiFi Properties is missing the following property: "
+                     + NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION);
+         }
+ 
+         try {
+             final FlowFileRepository created = 
NarThreadContextClassLoader.createInstance(implementationClassName, 
FlowFileRepository.class);
+             synchronized (created) {
+                 created.initialize(contentClaimManager);
+             }
+             return created;
+         } catch (final Exception e) {
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     private static FlowFileSwapManager createSwapManager(final NiFiProperties 
properties) {
+         final String implementationClassName = 
properties.getProperty(NiFiProperties.FLOWFILE_SWAP_MANAGER_IMPLEMENTATION, 
DEFAULT_SWAP_MANAGER_IMPLEMENTATION);
+         if (implementationClassName == null) {
+             return null;
+         }
+ 
+         try {
+             return 
NarThreadContextClassLoader.createInstance(implementationClassName, 
FlowFileSwapManager.class);
+         } catch (final Exception e) {
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     private static EventReporter createEventReporter(final BulletinRepository 
bulletinRepository) {
+         return new EventReporter() {
+             @Override
+             public void reportEvent(final Severity severity, final String 
category, final String message) {
+                 final Bulletin bulletin = 
BulletinFactory.createBulletin(category, severity.name(), message);
+                 bulletinRepository.addBulletin(bulletin);
+             }
+         };
+     }
+     
+     public void initializeFlow() throws IOException {
+         writeLock.lock();
+         try {
+             flowFileSwapManager = createSwapManager(properties);
+ 
+             long maxIdFromSwapFiles = -1L;
+             if (flowFileSwapManager != null) {
+                 if (flowFileRepository.isVolatile()) {
+                     flowFileSwapManager.purge();
+                 } else {
+                     maxIdFromSwapFiles = 
flowFileSwapManager.recoverSwappedFlowFiles(this, contentClaimManager);
+                 }
+             }
+ 
+             flowFileRepository.loadFlowFiles(this, maxIdFromSwapFiles + 1);
+ 
+             // now that we've loaded the FlowFiles, this has restored our 
ContentClaims' states, so we can tell the
+             // ContentRepository to purge superfluous files
+             contentRepository.cleanup();
+ 
+             if (flowFileSwapManager != null) {
+                 flowFileSwapManager.start(flowFileRepository, this, 
contentClaimManager, createEventReporter(bulletinRepository));
+             }
+ 
+             if (externalSiteListener != null) {
+                 externalSiteListener.start();
+             }
+ 
+             timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
+                 @Override
+                 public void run() {
+                     try {
+                         updateRemoteProcessGroups();
+                     } catch (final Throwable t) {
+                         LOG.warn("Unable to update Remote Process Groups due 
to " + t);
+                         if (LOG.isDebugEnabled()) {
+                             LOG.warn("", t);
+                         }
+                     }
+                 }
+             }, 0L, 30L, TimeUnit.SECONDS);
+ 
+             initialized.set(true);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     /**
+      * <p>
+      * Causes any processors that were added to the flow with a 'delayStart'
+      * flag of true to now start
+      * </p>
+      */
+     public void startDelayed() {
+         writeLock.lock();
+         try {
+             LOG.info("Starting {} processors/ports/funnels", 
(startConnectablesAfterInitialization.size() + 
startRemoteGroupPortsAfterInitialization.size()));
+             for (final Connectable connectable : 
startConnectablesAfterInitialization) {
+                 if (connectable.getScheduledState() == 
ScheduledState.DISABLED) {
+                     continue;
+                 }
+ 
+                 try {
+                     if (connectable instanceof ProcessorNode) {
+                         
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable);
+                     } else {
+                         startConnectable(connectable);
+                     }
+                 } catch (final Throwable t) {
+                     LOG.error("Unable to start {} due to {}", new 
Object[]{connectable, t});
+                 }
+             }
+ 
+             startConnectablesAfterInitialization.clear();
+ 
+             int startedTransmitting = 0;
+             for (final RemoteGroupPort remoteGroupPort : 
startRemoteGroupPortsAfterInitialization) {
+                 try {
+                     
remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort);
+                     startedTransmitting++;
+                 } catch (final Throwable t) {
+                     LOG.error("Unable to start transmitting with {} due to 
{}", new Object[]{remoteGroupPort, t});
+                 }
+             }
+ 
+             LOG.info("Started {} Remote Group Ports transmitting", 
startedTransmitting);
+             startRemoteGroupPortsAfterInitialization.clear();
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     private ContentRepository createContentRepository(final NiFiProperties 
properties) throws InstantiationException, IllegalAccessException, 
ClassNotFoundException {
+         final String implementationClassName = 
properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, 
DEFAULT_CONTENT_REPO_IMPLEMENTATION);
+         if (implementationClassName == null) {
+             throw new RuntimeException("Cannot create Provenance Repository 
because the NiFi Properties is missing the following property: "
+                     + NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION);
+         }
+ 
+         try {
+             final ContentRepository contentRepo = 
NarThreadContextClassLoader.createInstance(implementationClassName, 
ContentRepository.class);
+             synchronized (contentRepo) {
+                 contentRepo.initialize(contentClaimManager);
+             }
+             return contentRepo;
+         } catch (final Exception e) {
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     private ProvenanceEventRepository createProvenanceRepository(final 
NiFiProperties properties) throws InstantiationException, 
IllegalAccessException, ClassNotFoundException {
+         final String implementationClassName = 
properties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, 
DEFAULT_PROVENANCE_REPO_IMPLEMENTATION);
+         if (implementationClassName == null) {
+             throw new RuntimeException("Cannot create Provenance Repository 
because the NiFi Properties is missing the following property: "
+                     + NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
+         }
+ 
+         try {
+             return 
NarThreadContextClassLoader.createInstance(implementationClassName, 
ProvenanceEventRepository.class);
+         } catch (final Exception e) {
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     private ComponentStatusRepository createComponentStatusRepository() {
+         final String implementationClassName = 
properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION,
 DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
+         if (implementationClassName == null) {
+             throw new RuntimeException("Cannot create Component Status 
Repository because the NiFi Properties is missing the following property: "
+                     + 
NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
+         }
+ 
+         try {
+             return 
NarThreadContextClassLoader.createInstance(implementationClassName, 
ComponentStatusRepository.class);
+         } catch (final Exception e) {
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     /**
+      * Creates a connection between two Connectable objects.
+      *
+      * @param id required ID of the connection
+      * @param name the name of the connection, or <code>null</code> to leave 
the
+      * connection unnamed
+      * @param source required source
+      * @param destination required destination
+      * @param relationshipNames required collection of relationship names
+      * @return
+      *
+      * @throws NullPointerException if the ID, source, destination, or set of
+      * relationships is null.
+      * @throws IllegalArgumentException if <code>relationships</code> is an
+      * empty collection
+      */
+     public Connection createConnection(final String id, final String name, 
final Connectable source, final Connectable destination, final 
Collection<String> relationshipNames) {
+         final StandardConnection.Builder builder = new 
StandardConnection.Builder(processScheduler);
+ 
+         final List<Relationship> relationships = new ArrayList<>();
+         for (final String relationshipName : 
requireNonNull(relationshipNames)) {
+             relationships.add(new 
Relationship.Builder().name(relationshipName).build());
+         }
+ 
+         return builder.id(requireNonNull(id).intern()).name(name == null ? 
null : 
name.intern()).relationships(relationships).source(requireNonNull(source)).destination(destination).build();
+     }
+ 
+     /**
+      * Creates a new Label
+      *
+      * @param id
+      * @param text
+      * @return
+      * @throws NullPointerException if either argument is null
+      */
+     public Label createLabel(final String id, final String text) {
+         return new StandardLabel(requireNonNull(id).intern(), text);
+     }
+ 
+     /**
+      * Creates a funnel
+      *
+      * @param id
+      * @return
+      */
+     public Funnel createFunnel(final String id) {
+         return new StandardFunnel(id.intern(), null, processScheduler);
+     }
+ 
+     /**
+      * Creates a Port to use as an Input Port for a Process Group
+      *
+      * @param id
+      * @param name
+      * @return
+      * @throws NullPointerException if the ID or name is not unique
+      * @throws IllegalStateException if an Input Port already exists with the
+      * same name or id.
+      */
+     public Port createLocalInputPort(String id, String name) {
+         id = requireNonNull(id).intern();
+         name = requireNonNull(name).intern();
+         verifyPortIdDoesNotExist(id);
+         return new LocalPort(id, name, null, ConnectableType.INPUT_PORT, 
processScheduler);
+     }
+ 
+     /**
+      * Creates a Port to use as an Output Port for a Process Group
+      *
+      * @param id
+      * @param name
+      * @return
+      * @throws NullPointerException if the ID or name is not unique
+      * @throws IllegalStateException if an Input Port already exists with the
+      * same name or id.
+      */
+     public Port createLocalOutputPort(String id, String name) {
+         id = requireNonNull(id).intern();
+         name = requireNonNull(name).intern();
+         verifyPortIdDoesNotExist(id);
+         return new LocalPort(id, name, null, ConnectableType.OUTPUT_PORT, 
processScheduler);
+     }
+ 
+     /**
+      * Creates a ProcessGroup with the given ID
+      *
+      * @param id
+      * @return
+      * @throws NullPointerException if the argument is null
+      */
+     public ProcessGroup createProcessGroup(final String id) {
+         return new StandardProcessGroup(requireNonNull(id).intern(), this, 
processScheduler, properties, encryptor);
+     }
+ 
+     /**
+      * <p>
+      * Creates a new ProcessorNode with the given type and identifier and 
initializes it invoking the
+      * methods annotated with {@link OnAdded}.
+      * </p>
+      *
+      * @param type
+      * @param id
+      * @return
+      * @throws NullPointerException if either arg is null
+      * @throws ProcessorInstantiationException if the processor cannot be
+      * instantiated for any reason
+      */
+     public ProcessorNode createProcessor(final String type, String id) throws 
ProcessorInstantiationException {
+         return createProcessor(type, id, true);
+     }
+     
+     /**
+      * <p>
+      * Creates a new ProcessorNode with the given type and identifier and 
optionally initializes it.
+      * </p>
+      *
+      * @param type the fully qualified Processor class name
+      * @param id the unique ID of the Processor
+      * @param firstTimeAdded whether or not this is the first time this 
Processor is added to the graph. If {@code true},
+      *                       will invoke methods annotated with the {@link 
OnAdded} annotation.
+      * @return
+      * @throws NullPointerException if either arg is null
+      * @throws ProcessorInstantiationException if the processor cannot be
+      * instantiated for any reason
+      */
++    @SuppressWarnings("deprecation")
+     public ProcessorNode createProcessor(final String type, String id, final 
boolean firstTimeAdded) throws ProcessorInstantiationException {
+         id = id.intern();
+         final Processor processor = instantiateProcessor(type, id);
+         final ValidationContextFactory validationContextFactory = new 
StandardValidationContextFactory(controllerServiceProvider);
+         final ProcessorNode procNode = new StandardProcessorNode(processor, 
id, validationContextFactory, processScheduler, controllerServiceProvider);
+ 
+         final LogRepository logRepository = 
LogRepositoryFactory.getRepository(id);
+         logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, 
LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
+ 
+         if ( firstTimeAdded ) {
+             try (final NarCloseable x = NarCloseable.withNarLoader()) {
 -                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, 
processor);
++                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, 
org.apache.nifi.processor.annotation.OnAdded.class, processor);
+             } catch (final Exception e) {
+                 
logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
+                 throw new ProcessorLifeCycleException("Failed to invoke 
@OnAdded methods of " + procNode.getProcessor(), e);
+             }
+         }
+ 
+         return procNode;
+     }
+ 
+     private Processor instantiateProcessor(final String type, final String 
identifier) throws ProcessorInstantiationException {
+         Processor processor;
+ 
+         final ClassLoader ctxClassLoader = 
Thread.currentThread().getContextClassLoader();
+         try {
+             final ClassLoader detectedClassLoaderForType = 
ExtensionManager.getClassLoader(type);
+             final Class<?> rawClass;
+             if (detectedClassLoaderForType == null) {
+                 // try to find from the current class loader
+                 rawClass = Class.forName(type);
+             } else {
+                 // try to find from the registered classloader for that type
+                 rawClass = Class.forName(type, true, 
ExtensionManager.getClassLoader(type));
+             }
+ 
+             
Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
+             final Class<? extends Processor> processorClass = 
rawClass.asSubclass(Processor.class);
+             processor = processorClass.newInstance();
+             final ProcessorLog processorLogger = new 
SimpleProcessLogger(identifier, processor);
+             final ProcessorInitializationContext ctx = new 
StandardProcessorInitializationContext(identifier, processorLogger, this);
+             processor.initialize(ctx);
+             return processor;
+         } catch (final Throwable t) {
+             throw new ProcessorInstantiationException(type, t);
+         } finally {
+             if (ctxClassLoader != null) {
+                 Thread.currentThread().setContextClassLoader(ctxClassLoader);
+             }
+         }
+     }
+ 
+     /**
+      * @return the ExtensionManager used for instantiating Processors,
+      * Prioritizers, etc.
+      */
+     public ExtensionManager getExtensionManager() {
+         return extensionManager;
+     }
+ 
+     public String getInstanceId() {
+         readLock.lock();
+         try {
+             return instanceId;
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     /**
+      * Gets the BulletinRepository for storing and retrieving Bulletins.
+      *
+      * @return
+      */
+     public BulletinRepository getBulletinRepository() {
+         return bulletinRepository;
+     }
+ 
+     public SnippetManager getSnippetManager() {
+         return snippetManager;
+     }
+ 
+     /**
+      * Creates a Port to use as an Input Port for the root Process Group, 
which
+      * is used for Site-to-Site communications
+      *
+      * @param id
+      * @param name
+      * @return
+      * @throws NullPointerException if the ID or name is not unique
+      * @throws IllegalStateException if an Input Port already exists with the
+      * same name or id.
+      */
+     public Port createRemoteInputPort(String id, String name) {
+         id = requireNonNull(id).intern();
+         name = requireNonNull(name).intern();
+         verifyPortIdDoesNotExist(id);
+         return new StandardRootGroupPort(id, name, null, 
TransferDirection.RECEIVE, ConnectableType.INPUT_PORT, userService, 
getBulletinRepository(), processScheduler, 
Boolean.TRUE.equals(isSiteToSiteSecure));
+     }
+ 
+     /**
+      * Creates a Port to use as an Output Port for the root Process Group, 
which
+      * is used for Site-to-Site communications and will queue flow files 
waiting
+      * to be delivered to remote instances
+      *
+      * @param id
+      * @param name
+      * @return
+      * @throws NullPointerException if the ID or name is not unique
+      * @throws IllegalStateException if an Input Port already exists with the
+      * same name or id.
+      */
+     public Port createRemoteOutputPort(String id, String name) {
+         id = requireNonNull(id).intern();
+         name = requireNonNull(name).intern();
+         verifyPortIdDoesNotExist(id);
+         return new StandardRootGroupPort(id, name, null, 
TransferDirection.SEND, ConnectableType.OUTPUT_PORT, userService, 
getBulletinRepository(), processScheduler, 
Boolean.TRUE.equals(isSiteToSiteSecure));
+     }
+ 
+     /**
+      * Creates a new Remote Process Group with the given ID that points to the
+      * given URI
+      *
+      * @param id
+      * @param uri
+      * @return
+      *
+      * @throws NullPointerException if either argument is null
+      * @throws IllegalArgumentException if <code>uri</code> is not a valid 
URI.
+      */
+     public RemoteProcessGroup createRemoteProcessGroup(final String id, final 
String uri) {
+         return new StandardRemoteProcessGroup(requireNonNull(id).intern(), 
requireNonNull(uri).intern(), null, this, sslContext);
+     }
+ 
+     /**
+      * Verifies that no output port exists with the given id or name. If this
+      * does not hold true, throws an IllegalStateException
+      *
+      * @param id
+      * @throws IllegalStateException
+      */
+     private void verifyPortIdDoesNotExist(final String id) {
+         Port port = rootGroup.findOutputPort(id);
+         if (port != null) {
+             throw new IllegalStateException("An Input Port already exists 
with ID " + id);
+         }
+         port = rootGroup.findInputPort(id);
+         if (port != null) {
+             throw new IllegalStateException("An Input Port already exists 
with ID " + id);
+         }
+     }
+ 
+     /**
+      * @return the name of this controller, which is also the name of the Root
+      * Group.
+      */
+     public String getName() {
+         readLock.lock();
+         try {
+             return rootGroup.getName();
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     /**
+      * Sets the name for the Root Group, which also changes the name for the
+      * controller.
+      *
+      * @param name
+      */
+     public void setName(final String name) {
+         readLock.lock();
+         try {
+             rootGroup.setName(name);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     /**
+      * Gets the comments of this controller, which is also the comment of the
+      * Root Group.
+      *
+      * @return
+      */
+     public String getComments() {
+         readLock.lock();
+         try {
+             return rootGroup.getComments();
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     /**
+      * Sets the comment for the Root Group, which also changes the comment for
+      * the controller.
+      *
+      * @param comments
+      */
+     public void setComments(final String comments) {
+         readLock.lock();
+         try {
+             rootGroup.setComments(comments);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     /**
+      * @return <code>true</code> if the scheduling engine for this controller
+      * has been terminated.
+      */
+     public boolean isTerminated() {
+         this.readLock.lock();
+         try {
+             return (null == this.timerDrivenEngineRef.get() || 
this.timerDrivenEngineRef.get().isTerminated());
+         } finally {
+             this.readLock.unlock();
+         }
+     }
+ 
+     /**
+      * Triggers the controller to begin shutdown, stopping all processors and
+      * terminating the scheduling engine. After calling this method, the
+      * {@link #isTerminated()} method will indicate whether or not the 
shutdown
+      * has finished.
+      *
+      * @param kill if <code>true</code>, attempts to stop all active threads,
+      * but makes no guarantee that this will happen
+      *
+      * @throws IllegalStateException if the controller is already stopped or
+      * currently in the processor of stopping
+      */
+     public void shutdown(final boolean kill) {
+         this.shutdown = true;
+         stopAllProcessors();
+ 
+         writeLock.lock();
+         try {
+             if (isTerminated() || timerDrivenEngineRef.get().isTerminating()) 
{
+                 throw new IllegalStateException("Controller already stopped 
or still stopping...");
+             }
+ 
+             if (kill) {
+                 this.timerDrivenEngineRef.get().shutdownNow();
+                 this.eventDrivenEngineRef.get().shutdownNow();
+                 LOG.info("Initiated immediate shutdown of flow 
controller...");
+             } else {
+                 this.timerDrivenEngineRef.get().shutdown();
+                 this.eventDrivenEngineRef.get().shutdown();
+                 LOG.info("Initiated graceful shutdown of flow 
controller...waiting up to " + gracefulShutdownSeconds + " seconds");
+             }
+ 
+             clusterTaskExecutor.shutdown();
+ 
+             // Trigger any processors' methods marked with @OnShutdown to be 
called
+             rootGroup.shutdown();
+ 
+             try {
+                 
this.timerDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, 
TimeUnit.SECONDS);
+                 
this.eventDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, 
TimeUnit.SECONDS);
+             } catch (final InterruptedException ie) {
+                 LOG.info("Interrupted while waiting for controller 
termination.");
+             }
+ 
+             try {
+                 flowFileRepository.close();
+             } catch (final Throwable t) {
+                 LOG.warn("Unable to shut down FlowFileRepository due to {}", 
new Object[]{t});
+             }
+ 
+             if (this.timerDrivenEngineRef.get().isTerminated() && 
eventDrivenEngineRef.get().isTerminated()) {
+                 LOG.info("Controller has been terminated successfully.");
+             } else {
+                 LOG.warn("Controller hasn't terminated properly.  There 
exists an uninterruptable thread that will take an indeterminate amount of time 
to stop.  Might need to kill the program manually.");
+             }
+ 
+             if (externalSiteListener != null) {
+                 externalSiteListener.stop();
+             }
+ 
+             if (flowFileSwapManager != null) {
+                 flowFileSwapManager.shutdown();
+             }
+             
+             if ( processScheduler != null ) {
+               processScheduler.shutdown();
+             }
+             
+             if ( contentRepository != null ) {
+                 contentRepository.shutdown();
+             }
+             
+             if ( provenanceEventRepository != null ) {
+               try {
+                       provenanceEventRepository.close();
+               } catch (final IOException ioe) {
+                       LOG.warn("There was a problem shutting down the 
Provenance Repository: " + ioe.toString());
+                       if ( LOG.isDebugEnabled() ) {
+                               LOG.warn("", ioe);
+                       }
+               }
+             }
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     /**
+      * Serializes the current state of the controller to the given 
OutputStream
+      *
+      * @param serializer
+      * @param os
+      * @throws FlowSerializationException if serialization of the flow fails 
for
+      * any reason
+      */
+     public void serialize(final FlowSerializer serializer, final OutputStream 
os) throws FlowSerializationException {
+         readLock.lock();
+         try {
+             serializer.serialize(this, os);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     /**
+      * Synchronizes this controller with the proposed flow.
+      *
+      * For more details, see
+      * {@link FlowSynchronizer#sync(FlowController, DataFlow)}.
+      *
+      * @param synchronizer
+      * @param dataFlow the flow to load the controller with. If the flow is 
null
+      * or zero length, then the controller must not have a flow or else an
+      * UninheritableFlowException will be thrown.
+      *
+      * @throws FlowSerializationException if proposed flow is not a valid flow
+      * configuration file
+      * @throws UninheritableFlowException if the proposed flow cannot be 
loaded
+      * by the controller because in doing so would risk orphaning flow files
+      * @throws FlowSynchronizationException if updates to the controller 
failed.
+      * If this exception is thrown, then the controller should be considered
+      * unsafe to be used
+      */
+     public void synchronize(final FlowSynchronizer synchronizer, final 
DataFlow dataFlow)
+             throws FlowSerializationException, FlowSynchronizationException, 
UninheritableFlowException {
+         writeLock.lock();
+         try {
+             LOG.debug("Synchronizing controller with proposed flow");
+             synchronizer.sync(this, dataFlow, encryptor);
+             LOG.info("Successfully synchronized controller with proposed 
flow");
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     /**
+      * @return the currently configured maximum number of threads that can be
+      * used for executing processors at any given time.
+      */
+     public int getMaxTimerDrivenThreadCount() {
+         return maxTimerDrivenThreads.get();
+     }
+ 
+     public int getMaxEventDrivenThreadCount() {
+         return maxEventDrivenThreads.get();
+     }
+ 
+     public void setMaxTimerDrivenThreadCount(final int maxThreadCount) {
+         writeLock.lock();
+         try {
+             setMaxThreadCount(maxThreadCount, 
this.timerDrivenEngineRef.get(), this.maxTimerDrivenThreads);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     public void setMaxEventDrivenThreadCount(final int maxThreadCount) {
+         writeLock.lock();
+         try {
+             setMaxThreadCount(maxThreadCount, 
this.eventDrivenEngineRef.get(), this.maxEventDrivenThreads);
+             
processScheduler.setMaxThreadCount(SchedulingStrategy.EVENT_DRIVEN, 
maxThreadCount);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     /**
+      * Updates the number of threads that can be simultaneously used for
+      * executing processors.
+      *
+      * @param maxThreadCount
+      *
+      * This method must be called while holding the write lock!
+      */
+     private void setMaxThreadCount(final int maxThreadCount, final FlowEngine 
engine, final AtomicInteger maxThreads) {
+         if (maxThreadCount < 1) {
+             throw new IllegalArgumentException();
+         }
+ 
+         maxThreads.getAndSet(maxThreadCount);
+         if (null != engine && engine.getCorePoolSize() < maxThreadCount) {
+             engine.setCorePoolSize(maxThreads.intValue());
+         }
+     }
+ 
+     /**
+      * @return the ID of the root group
+      */
+     public String getRootGroupId() {
+         readLock.lock();
+         try {
+             return rootGroup.getIdentifier();
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     /**
+      * Sets the root group to the given group
+      *
+      * @param group the ProcessGroup that is to become the new Root Group
+      *
+      * @throws IllegalArgumentException if the ProcessGroup has a parent
+      * @throws IllegalStateException if the FlowController does not know about
+      * the given process group
+      */
+     void setRootGroup(final ProcessGroup group) {
+         if (requireNonNull(group).getParent() != null) {
+             throw new IllegalArgumentException("A ProcessGroup that has a 
parent cannot be the Root Group");
+         }
+ 
+         writeLock.lock();
+         try {
+             rootGroup = group;
+ 
+             if (externalSiteListener != null) {
+                 externalSiteListener.setRootGroup(group);
+             }
+ 
+             // update the heartbeat bean
+             this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, 
connected));
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     public SystemDiagnostics getSystemDiagnostics() {
+         final SystemDiagnosticsFactory factory = new 
SystemDiagnosticsFactory();
+         return factory.create(flowFileRepository, contentRepository);
+     }
+ 
+     //
+     // ProcessGroup access
+     //
+     /**
+      * Updates the process group corresponding to the specified DTO. Any field
+      * in DTO that is <code>null</code> (with the exception of the required 
ID)
+      * will be ignored.
+      *
+      * @param dto
+      * @return a fully-populated DTO representing the newly updated 
ProcessGroup
+      * @throws ProcessorInstantiationException
+      *
+      * @throws IllegalStateException if no process group can be found with the
+      * ID of DTO or with the ID of the DTO's parentGroupId, if the template ID
+      * specified is invalid, or if the DTO's Parent Group ID changes but the
+      * parent group has incoming or outgoing connections
+      *
+      * @throws NullPointerException if the DTO or its ID is null
+      */
+     public void updateProcessGroup(final ProcessGroupDTO dto) throws 
ProcessorInstantiationException {
+         final ProcessGroup group = lookupGroup(requireNonNull(dto).getId());
+ 
+         final String name = dto.getName();
+         final PositionDTO position = dto.getPosition();
+         final String comments = dto.getComments();
+ 
+         if (name != null) {
+             group.setName(name);
+         }
+         if (position != null) {
+             group.setPosition(toPosition(position));
+         }
+         if (comments != null) {
+             group.setComments(comments);
+         }
+     }
+ 
+     //
+     // Template access
+     //
+     /**
+      * Adds a template to this controller. The contents of this template must 
be
+      * part of the current flow. This is going create a template based on a
+      * snippet of this flow.
+      *
+      * @param dto
+      * @return a copy of the given DTO
+      * @throws IOException if an I/O error occurs when persisting the Template
+      * @throws NullPointerException if the DTO is null
+      * @throws IllegalArgumentException if does not contain all required
+      * information, such as the template name or a processor's configuration
+      * element
+      */
+     public Template addTemplate(final TemplateDTO dto) throws IOException {
+         return templateManager.addTemplate(dto);
+     }
+ 
+     /**
+      * Removes all templates from this controller
+      *
+      * @throws IOException
+      */
+     public void clearTemplates() throws IOException {
+         templateManager.clear();
+     }
+ 
+     /**
+      * Imports the specified template into this controller. The contents of 
this
+      * template may have come from another NiFi instance.
+      *
+      * @param dto
+      * @return
+      * @throws IOException
+      */
+     public Template importTemplate(final TemplateDTO dto) throws IOException {
+         return templateManager.importTemplate(dto);
+     }
+ 
+     /**
+      * Returns the template with the given ID, or <code>null</code> if no
+      * template exists with the given ID.
+      *
+      * @param id
+      * @return
+      */
+     public Template getTemplate(final String id) {
+         return templateManager.getTemplate(id);
+     }
+ 
+     public TemplateManager getTemplateManager() {
+         return templateManager;
+     }
+ 
+     /**
+      * Returns all templates that this controller knows about.
+      *
+      * @return
+      */
+     public Collection<Template> getTemplates() {
+         return templateManager.getTemplates();
+     }
+ 
+     /**
+      * Removes the template with the given ID.
+      *
+      * @param id the ID of the template to remove
+      * @throws NullPointerException if the argument is null
+      * @throws IllegalStateException if no template exists with the given ID
+      * @throws IOException if template could not be removed
+      */
+     public void removeTemplate(final String id) throws IOException, 
IllegalStateException {
+         templateManager.removeTemplate(id);
+     }
+ 
+     private Position toPosition(final PositionDTO dto) {
+         return new Position(dto.getX(), dto.getY());
+     }
+ 
+     //
+     // Snippet
+     //
+     /**
+      * Creates an instance of the given snippet and adds the components to the
+      * given group
+      *
+      * @param group
+      * @param dto
+      *
+      * @throws NullPointerException if either argument is null
+      * @throws IllegalStateException if the snippet is not valid because a
+      * component in the snippet has an ID that is not unique to this flow, or
+      * because it shares an Input Port or Output Port at the root level whose
+      * name already exists in the given ProcessGroup, or because the Template
+      * contains a Processor or a Prioritizer whose class is not valid within
+      * this instance of NiFi.
+      * @throws ProcessorInstantiationException if unable to instantiate a
+      * processor
+      */
+     public void instantiateSnippet(final ProcessGroup group, final 
FlowSnippetDTO dto) throws ProcessorInstantiationException {
+         writeLock.lock();
+         try {
+             validateSnippetContents(requireNonNull(group), dto);
+ 
+             //
+             // Instantiate the labels
+             //
+             for (final LabelDTO labelDTO : dto.getLabels()) {
+                 final Label label = createLabel(labelDTO.getId(), 
labelDTO.getLabel());
+                 label.setPosition(toPosition(labelDTO.getPosition()));
+                 if (labelDTO.getWidth() != null && labelDTO.getHeight() != 
null) {
+                     label.setSize(new Size(labelDTO.getWidth(), 
labelDTO.getHeight()));
+                 }
+ 
+                 // TODO: Update the label's "style"
+                 group.addLabel(label);
+             }
+ 
+             // 
+             // Instantiate the funnels
+             for (final FunnelDTO funnelDTO : dto.getFunnels()) {
+                 final Funnel funnel = createFunnel(funnelDTO.getId());
+                 funnel.setPosition(toPosition(funnelDTO.getPosition()));
+                 group.addFunnel(funnel);
+             }
+ 
+             //
+             // Instantiate Input Ports & Output Ports
+             //
+             for (final PortDTO portDTO : dto.getInputPorts()) {
+                 final Port inputPort;
+                 if (group.isRootGroup()) {
+                     inputPort = createRemoteInputPort(portDTO.getId(), 
portDTO.getName());
+                     
inputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount());
+                     if (portDTO.getGroupAccessControl() != null) {
+                         ((RootGroupPort) 
inputPort).setGroupAccessControl(portDTO.getGroupAccessControl());
+                     }
+                     if (portDTO.getUserAccessControl() != null) {
+                         ((RootGroupPort) 
inputPort).setUserAccessControl(portDTO.getUserAccessControl());
+                     }
+                 } else {
+                     inputPort = createLocalInputPort(portDTO.getId(), 
portDTO.getName());
+                 }
+ 
+                 inputPort.setPosition(toPosition(portDTO.getPosition()));
+                 inputPort.setProcessGroup(group);
+                 inputPort.setComments(portDTO.getComments());
+                 group.addInputPort(inputPort);
+             }
+ 
+             for (final PortDTO portDTO : dto.getOutputPorts()) {
+                 final Port outputPort;
+                 if (group.isRootGroup()) {
+                     outputPort = createRemoteOutputPort(portDTO.getId(), 
portDTO.getName());
+                     
outputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount());
+                     if (portDTO.getGroupAccessControl() != null) {
+                         ((RootGroupPort) 
outputPort).setGroupAccessControl(portDTO.getGroupAccessControl());
+                     }
+                     if (portDTO.getUserAccessControl() != null) {
+                         ((RootGroupPort) 
outputPort).setUserAccessControl(portDTO.getUserAccessControl());
+                     }
+                 } else {
+                     outputPort = createLocalOutputPort(portDTO.getId(), 
portDTO.getName());
+                 }
+ 
+                 outputPort.setPosition(toPosition(portDTO.getPosition()));
+                 outputPort.setProcessGroup(group);
+                 outputPort.setComments(portDTO.getComments());
+                 group.addOutputPort(outputPort);
+             }
+ 
+             //
+             // Instantiate the processors
+             //
+             for (final ProcessorDTO processorDTO : dto.getProcessors()) {
+                 final ProcessorNode procNode = 
createProcessor(processorDTO.getType(), processorDTO.getId());
+ 
+                 procNode.setPosition(toPosition(processorDTO.getPosition()));
+                 procNode.setProcessGroup(group);
+ 
+                 final ProcessorConfigDTO config = processorDTO.getConfig();
+                 procNode.setComments(config.getComments());
+                 if (config.isLossTolerant() != null) {
+                     procNode.setLossTolerant(config.isLossTolerant());
+                 }
+                 procNode.setName(processorDTO.getName());
+ 
+                 procNode.setYieldPeriod(config.getYieldDuration());
+                 procNode.setPenalizationPeriod(config.getPenaltyDuration());
+                 
procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
+                 procNode.setAnnotationData(config.getAnnotationData());
+                 procNode.setStyle(processorDTO.getStyle());
+ 
+                 if (config.getRunDurationMillis() != null) {
+                     procNode.setRunDuration(config.getRunDurationMillis(), 
TimeUnit.MILLISECONDS);
+                 }
+ 
+                 if (config.getSchedulingStrategy() != null) {
+                     
procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
+                 }
+ 
+                 // ensure that the scheduling strategy is set prior to these 
values
+                 
procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
+                 procNode.setScheduldingPeriod(config.getSchedulingPeriod());
+ 
+                 final Set<Relationship> relationships = new HashSet<>();
+                 if (processorDTO.getRelationships() != null) {
+                     for (final RelationshipDTO rel : 
processorDTO.getRelationships()) {
+                         if (rel.isAutoTerminate()) {
+                             
relationships.add(procNode.getRelationship(rel.getName()));
+                         }
+                     }
+                     procNode.setAutoTerminatedRelationships(relationships);
+                 }
+ 
+                 if (config.getProperties() != null) {
+                     for (Map.Entry<String, String> entry : 
config.getProperties().entrySet()) {
+                         if (entry.getValue() != null) {
+                             procNode.setProperty(entry.getKey(), 
entry.getValue());
+                         }
+                     }
+                 }
+ 
+                 group.addProcessor(procNode);
+             }
+ 
+             //
+             // Instantiate Remote Process Groups
+             //
+             for (final RemoteProcessGroupDTO remoteGroupDTO : 
dto.getRemoteProcessGroups()) {
+                 final RemoteProcessGroup remoteGroup = 
createRemoteProcessGroup(remoteGroupDTO.getId(), remoteGroupDTO.getTargetUri());
+                 remoteGroup.setComments(remoteGroupDTO.getComments());
+                 
remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition()));
+                 
remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout());
+                 
remoteGroup.setYieldDuration(remoteGroupDTO.getYieldDuration());
+                 remoteGroup.setProcessGroup(group);
+ 
+                 // set the input/output ports
+                 if (remoteGroupDTO.getContents() != null) {
+                     final RemoteProcessGroupContentsDTO contents = 
remoteGroupDTO.getContents();
+ 
+                     // ensure there input ports
+                     if (contents.getInputPorts() != null) {
+                         
remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts()));
+                     }
+ 
+                     // ensure there are output ports
+                     if (contents.getOutputPorts() != null) {
+                         
remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts()));
+                     }
+                 }
+ 
+                 group.addRemoteProcessGroup(remoteGroup);
+             }
+ 
+             // 
+             // Instantiate ProcessGroups
+             //
+             for (final ProcessGroupDTO groupDTO : dto.getProcessGroups()) {
+                 final ProcessGroup childGroup = 
createProcessGroup(groupDTO.getId());
+                 childGroup.setParent(group);
+                 childGroup.setPosition(toPosition(groupDTO.getPosition()));
+                 childGroup.setComments(groupDTO.getComments());
+                 childGroup.setName(groupDTO.getName());
+                 group.addProcessGroup(childGroup);
+ 
+                 final FlowSnippetDTO contents = groupDTO.getContents();
+ 
+                 // we want this to be recursive, so we will create a new 
template that contains only
+                 // the contents of this child group and recursively call 
ourselves.
+                 final FlowSnippetDTO childTemplateDTO = new FlowSnippetDTO();
+                 childTemplateDTO.setConnections(contents.getConnections());
+                 childTemplateDTO.setInputPorts(contents.getInputPorts());
+                 childTemplateDTO.setLabels(contents.getLabels());
+                 childTemplateDTO.setOutputPorts(contents.getOutputPorts());
+                 
childTemplateDTO.setProcessGroups(contents.getProcessGroups());
+                 childTemplateDTO.setProcessors(contents.getProcessors());
+                 childTemplateDTO.setFunnels(contents.getFunnels());
+                 
childTemplateDTO.setRemoteProcessGroups(contents.getRemoteProcessGroups());
+                 instantiateSnippet(childGroup, childTemplateDTO);
+             }
+ 
+             //
+             // Instantiate Connections
+             //
+             for (final ConnectionDTO connectionDTO : dto.getConnections()) {
+                 final ConnectableDTO sourceDTO = connectionDTO.getSource();
+                 final ConnectableDTO destinationDTO = 
connectionDTO.getDestination();
+                 final Connectable source;
+                 final Connectable destination;
+ 
+                 // locate the source and destination connectable. if this is 
a remote port 
+                 // we need to locate the remote process groups. otherwise we 
need to 
+                 // find the connectable given its parent group.
+                 // NOTE: (getConnectable returns ANY connectable, when the 
parent is
+                 // not this group only input ports or output ports should be 
returned. if something 
+                 // other than a port is returned, an exception will be thrown 
when adding the 
+                 // connection below.)
+                 // see if the source connectable is a remote port
+                 if 
(ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDTO.getType())) {
+                     final RemoteProcessGroup remoteGroup = 
group.getRemoteProcessGroup(sourceDTO.getGroupId());
+                     source = remoteGroup.getOutputPort(sourceDTO.getId());
+                 } else {
+                     final ProcessGroup sourceGroup = 
getConnectableParent(group, sourceDTO.getGroupId());
+                     source = sourceGroup.getConnectable(sourceDTO.getId());
+                 }
+ 
+                 // see if the destination connectable is a remote port
+                 if 
(ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationDTO.getType())) {
+                     final RemoteProcessGroup remoteGroup = 
group.getRemoteProcessGroup(destinationDTO.getGroupId());
+                     destination = 
remoteGroup.getInputPort(destinationDTO.getId());
+                 } else {
+                     final ProcessGroup destinationGroup = 
getConnectableParent(group, destinationDTO.getGroupId());
+                     destination = 
destinationGroup.getConnectable(destinationDTO.getId());
+                 }
+ 
+                 // determine the selection relationships for this connection
+                 final Set<String> relationships = new HashSet<>();
+                 if (connectionDTO.getSelectedRelationships() != null) {
+                     
relationships.addAll(connectionDTO.getSelectedRelationships());
+                 }
+ 
+                 final Connection connection = 
createConnection(connectionDTO.getId(), connectionDTO.getName(), source, 
destination, relationships);
+ 
+                 if (connectionDTO.getBends() != null) {
+                     final List<Position> bendPoints = new ArrayList<>();
+                     for (final PositionDTO bend : connectionDTO.getBends()) {
+                         bendPoints.add(new Position(bend.getX(), 
bend.getY()));
+                     }
+                     connection.setBendPoints(bendPoints);
+                 }
+ 
+                 final FlowFileQueue queue = connection.getFlowFileQueue();
+                 
queue.setBackPressureDataSizeThreshold(connectionDTO.getBackPressureDataSizeThreshold());
+                 
queue.setBackPressureObjectThreshold(connectionDTO.getBackPressureObjectThreshold());
+                 
queue.setFlowFileExpiration(connectionDTO.getFlowFileExpiration());
+ 
+                 final List<String> prioritizers = 
connectionDTO.getPrioritizers();
+                 if (prioritizers != null) {
+                     final List<String> newPrioritizersClasses = new 
ArrayList<>(prioritizers);
+                     final List<FlowFilePrioritizer> newPrioritizers = new 
ArrayList<>();
+                     for (final String className : newPrioritizersClasses) {
+                         try {
+                             newPrioritizers.add(createPrioritizer(className));
+                         } catch (final ClassNotFoundException | 
InstantiationException | IllegalAccessException e) {
+                             throw new IllegalArgumentException("Unable to set 
prioritizer " + className + ": " + e);
+                         }
+                     }
+                     queue.setPriorities(newPrioritizers);
+                 }
+ 
+                 connection.setProcessGroup(group);
+                 group.addConnection(connection);
+             }
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     /**
+      * Converts a set of ports into a set of remote process group ports.
+      *
+      * @param ports
+      * @return
+      */
+     private Set<RemoteProcessGroupPortDescriptor> convertRemotePort(final 
Set<RemoteProcessGroupPortDTO> ports) {
+         Set<RemoteProcessGroupPortDescriptor> remotePorts = null;
+         if (ports != null) {
+             remotePorts = new LinkedHashSet<>(ports.size());
+             for (RemoteProcessGroupPortDTO port : ports) {
+                 final StandardRemoteProcessGroupPortDescriptor descriptor = 
new StandardRemoteProcessGroupPortDescriptor();
+                 descriptor.setId(port.getId());
+                 descriptor.setName(port.getName());
+                 descriptor.setComments(port.getComments());
+                 descriptor.setTargetRunning(port.isTargetRunning());
+                 descriptor.setConnected(port.isConnected());
+                 
descriptor.setConcurrentlySchedulableTaskCount(port.getConcurrentlySchedulableTaskCount());
+                 descriptor.setTransmitting(port.isTransmitting());
+                 descriptor.setUseCompression(port.getUseCompression());
+                 remotePorts.add(descriptor);
+             }
+         }
+         return remotePorts;
+     }
+ 
+     /**
+      * Returns the parent of the specified Connectable. This only considers 
this
+      * group and any direct child sub groups.
+      *
+      * @param parentGroupId
+      * @return
+      */
+     private ProcessGroup getConnectableParent(final ProcessGroup group, final 
String parentGroupId) {
+         if (areGroupsSame(group.getIdentifier(), parentGroupId)) {
+             return group;
+         } else {
+             return group.getProcessGroup(parentGroupId);
+         }
+     }
+ 
+     /**
+      * <p>
+      * Verifies that the given DTO is valid, according to the following:
+      *
+      * <ul>
+      * <li>None of the ID's in any component of the DTO can be used in this
+      * flow.</li>
+      * <li>The ProcessGroup to which the template's contents will be added 
must
+      * not contain any InputPort or OutputPort with the same name as one of 
the
+      * corresponding components in the root level of the template.</li>
+      * <li>All Processors' classes must exist in this instance.</li>
+      * <li>All Flow File Prioritizers' classes must exist in this 
instance.</li>
+      * </ul>
+      * </p>
+      *
+      * <p>
+      * If any of the above statements does not hold true, an
+      * {@link IllegalStateException} or a
+      * {@link ProcessorInstantiationException} will be thrown.
+      * </p>
+      *
+      * @param group
+      * @param templateContents
+      */
+     private void validateSnippetContents(final ProcessGroup group, final 
FlowSnippetDTO templateContents) {
+         // validate the names of Input Ports
+         for (final PortDTO port : templateContents.getInputPorts()) {
+             if (group.getInputPortByName(port.getName()) != null) {
+                 throw new IllegalStateException("ProcessGroup already has an 
Input Port with name " + port.getName());
+             }
+         }
+ 
+         // validate the names of Output Ports
+         for (final PortDTO port : templateContents.getOutputPorts()) {
+             if (group.getOutputPortByName(port.getName()) != null) {
+                 throw new IllegalStateException("ProcessGroup already has an 
Output Port with name " + port.getName());
+             }
+         }
+ 
+         // validate that all Processor Types and Prioritizer Types are valid
+         final List<String> processorClasses = new ArrayList<>();
+         for (final Class<?> c : 
ExtensionManager.getExtensions(Processor.class)) {
+             processorClasses.add(c.getName());
+         }
+         final List<String> prioritizerClasses = new ArrayList<>();
+         for (final Class<?> c : 
ExtensionManager.getExtensions(FlowFilePrioritizer.class)) {
+             prioritizerClasses.add(c.getName());
+         }
+ 
+         final Set<ProcessorDTO> allProcs = new HashSet<>();
+         final Set<ConnectionDTO> allConns = new HashSet<>();
+         allProcs.addAll(templateContents.getProcessors());
+         allConns.addAll(templateContents.getConnections());
+         for (final ProcessGroupDTO childGroup : 
templateContents.getProcessGroups()) {
+             allProcs.addAll(findAllProcessors(childGroup));
+             allConns.addAll(findAllConnections(childGroup));
+         }
+ 
+         for (final ProcessorDTO proc : allProcs) {
+             if (!processorClasses.contains(proc.getType())) {
+                 throw new IllegalStateException("Invalid Processor Type: " + 
proc.getType());
+             }
+         }
+ 
+         for (final ConnectionDTO conn : allConns) {
+             final List<String> prioritizers = conn.getPrioritizers();
+             if (prioritizers != null) {
+                 for (final String prioritizer : prioritizers) {
+                     if (!prioritizerClasses.contains(prioritizer)) {
+                         throw new IllegalStateException("Invalid FlowFile 
Prioritizer Type: " + prioritizer);
+                     }
+                 }
+             }
+         }
+     }
+ 
+     /**
+      * Recursively finds all ProcessorDTO's
+      *
+      * @param group
+      * @return
+      */
+     private Set<ProcessorDTO> findAllProcessors(final ProcessGroupDTO group) {
+         final Set<ProcessorDTO> procs = new HashSet<>();
+         for (final ProcessorDTO dto : group.getContents().getProcessors()) {
+             procs.add(dto);
+         }
+ 
+         for (final ProcessGroupDTO childGroup : 
group.getContents().getProcessGroups()) {
+             procs.addAll(findAllProcessors(childGroup));
+         }
+         return procs;
+     }
+ 
+     /**
+      * Recursively finds all ConnectionDTO's
+      *
+      * @param group
+      * @return
+      */
+     private Set<ConnectionDTO> findAllConnections(final ProcessGroupDTO 
group) {
+         final Set<ConnectionDTO> conns = new HashSet<>();
+         for (final ConnectionDTO dto : group.getContents().getConnections()) {
+             conns.add(dto);
+         }
+ 
+         for (final ProcessGroupDTO childGroup : 
group.getContents().getProcessGroups()) {
+             conns.addAll(findAllConnections(childGroup));
+         }
+         return conns;
+     }
+ 
+     //
+     // Processor access
+     //
+     /**
+      * Indicates whether or not the two ID's point to the same ProcessGroup. 
If
+      * either id is null, will return <code>false</code.
+      *
+      * @param id1
+      * @param id2
+      * @return
+      */
+     public boolean areGroupsSame(final String id1, final String id2) {
+         if (id1 == null || id2 == null) {
+             return false;
+         } else if (id1.equals(id2)) {
+             return true;
+         } else {
+             final String comparable1 = (id1.equals(ROOT_GROUP_ID_ALIAS) ? 
getRootGroupId() : id1);
+             final String comparable2 = (id2.equals(ROOT_GROUP_ID_ALIAS) ? 
getRootGroupId() : id2);
+             return (comparable1.equals(comparable2));
+         }
+     }
+ 
+     public FlowFilePrioritizer createPrioritizer(final String type) throws 
InstantiationException, IllegalAccessException, ClassNotFoundException {
+         FlowFilePrioritizer prioritizer;
+ 
+         final ClassLoader ctxClassLoader = 
Thread.currentThread().getContextClassLoader();
+         try {
+             final ClassLoader detectedClassLoaderForType = 
ExtensionManager.getClassLoader(type);
+             final Class<?> rawClass;
+             if (detectedClassLoaderForType == null) {
+                 // try to find from the current class loader
+                 rawClass = Class.forName(type);
+             } else {
+                 // try to find from the registered classloader for that type
+                 rawClass = Class.forName(type, true, 
ExtensionManager.getClassLoader(type));
+             }
+ 
+             
Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
+             final Class<? extends FlowFilePrioritizer> prioritizerClass = 
rawClass.asSubclass(FlowFilePrioritizer.class);
+             final Object processorObj = prioritizerClass.newInstance();
+             prioritizer = prioritizerClass.cast(processorObj);
+ 
+             return prioritizer;
+         } finally {
+             if (ctxClassLoader != null) {
+                 Thread.currentThread().setContextClassLoader(ctxClassLoader);
+             }
+         }
+     }
+ 
+     //
+     // InputPort access
+     //
+     public PortDTO updateInputPort(final String parentGroupId, final PortDTO 
dto) {
+         final ProcessGroup parentGroup = lookupGroup(parentGroupId);
+         final Port port = parentGroup.getInputPort(dto.getId());
+         if (port == null) {
+             throw new IllegalStateException("No Input Port with ID " + 
dto.getId() + " is known as a child of ProcessGroup with ID " + parentGroupId);
+         }
+ 
+         final String name = dto.getName();
+         if (dto.getPosition() != null) {
+             port.setPosition(toPosition(dto.getPosition()));
+         }
+ 
+         if (name != null) {
+             port.setName(name);
+         }
+ 
+         return createDTO(port);
+     }
+ 
+     private PortDTO createDTO(final Port port) {
+         if (port == null) {
+             return null;
+         }
+ 
+         final PortDTO dto = new PortDTO();
+         dto.setId(port.getIdentifier());
+         dto.setPosition(new PositionDTO(port.getPosition().getX(), 
port.getPosition().getY()));
+         dto.setName(port.getName());
+         dto.setParentGroupId(port.getProcessGroup().getIdentifier());
+ 
+         return dto;
+     }
+ 
+     //
+     // OutputPort access
+     //
+     public PortDTO updateOutputPort(final String parentGroupId, final PortDTO 
dto) {
+         final ProcessGroup parentGroup = lookupGroup(parentGroupId);
+         final Port port = parentGroup.getOutputPort(dto.getId());
+         if (port == null) {
+             throw new IllegalStateException("No Output Port with ID " + 
dto.getId() + " is known as a child of ProcessGroup with ID " + parentGroupId);
+         }
+ 
+         final String name = dto.getName();
+         if (name != null) {
+             port.setName(name);
+         }
+ 
+         if (dto.getPosition() != null) {
+             port.setPosition(toPosition(dto.getPosition()));
+         }
+ 
+         return createDTO(port);
+     }
+ 
+     //
+     // Processor/Prioritizer/Filter Class Access
+     //
+     @SuppressWarnings("rawtypes")
+     public Set<Class> getFlowFileProcessorClasses() {
+         return ExtensionManager.getExtensions(Processor.class);
+     }
+ 
+     @SuppressWarnings("rawtypes")
+     public Set<Class> getFlowFileComparatorClasses() {
+         return ExtensionManager.getExtensions(FlowFilePrioritizer.class);
+     }
+ 
+     /**
+      * Returns the ProcessGroup with the given ID
+      *
+      * @param id
+      * @return the process group or null if not group is found
+      */
+     private ProcessGroup lookupGroup(final String id) {
+         final ProcessGroup group = getGroup(id);
+         if (group == null) {
+             throw new IllegalStateException("No Group with ID " + id + " 
exists");
+         }
+         return group;
+     }
+ 
+     /**
+      * Returns the ProcessGroup with the given ID
+      *
+      * @param id
+      * @return the process group or null if not group is found
+      */
+     public ProcessGroup getGroup(final String id) {
+         requireNonNull(id);
+         final ProcessGroup root;
+         readLock.lock();
+         try {
+             root = rootGroup;
+         } finally {
+             readLock.unlock();
+         }
+ 
+         final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? 
getRootGroupId() : id;
+         return (root == null) ? null : root.findProcessGroup(searchId);
+     }
+ 
+     @Override
+     public ProcessGroupStatus getControllerStatus() {
+         return getGroupStatus(getRootGroupId());
+     }
+ 
+     public ProcessGroupStatus getGroupStatus(final String groupId) {
+         return getGroupStatus(groupId, getProcessorStats());
+     }
+ 
+     public ProcessGroupStatus getGroupStatus(final String groupId, final 
RepositoryStatusReport statusReport) {
+         final ProcessGroup group = getGroup(groupId);
+         return getGroupStatus(group, statusReport);
+     }
+ 
+     public ProcessGroupStatus getGroupStatus(final ProcessGroup group, final 
RepositoryStatusReport statusReport) {
+         if (group == null) {
+             return null;
+         }
+ 
+         final ProcessGroupStatus status = new ProcessGroupStatus();
+         status.setId(group.getIdentifier());
+         status.setName(group.getName());
+         status.setCreationTimestamp(new Date().getTime());
+         int activeGroupThreads = 0;
+         long bytesRead = 0L;
+         long bytesWritten = 0L;
+         int queuedCount = 0;
+         long queuedContentSize = 0L;
+         int flowFilesIn = 0;
+         long bytesIn = 0L;
+         int flowFilesOut = 0;
+         long bytesOut = 0L;
+         int flowFilesReceived = 0;
+         long bytesReceived = 0L;
+         int flowFilesSent = 0;
+         long bytesSent = 0L;
+ 
+         // set status for processors
+         final Collection<ProcessorStatus> processorStatusCollection = new 
ArrayList<>();
+         status.setProcessorStatus(processorStatusCollection);
+

<TRUNCATED>

Reply via email to