http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 0000000,346e801..1b7a3c0 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@@ -1,0 -1,3579 +1,3643 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.controller; + + import static java.util.Objects.requireNonNull; + + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.text.DateFormat; + import java.text.SimpleDateFormat; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.Collection; + import java.util.Collections; + import java.util.Date; + import java.util.HashSet; + import java.util.LinkedHashSet; + import java.util.List; + import java.util.Locale; + import java.util.Map; + import java.util.Set; + import java.util.UUID; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.ScheduledFuture; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicInteger; + import java.util.concurrent.atomic.AtomicReference; + import java.util.concurrent.locks.Lock; + import java.util.concurrent.locks.ReentrantReadWriteLock; + + import javax.net.ssl.SSLContext; + + import org.apache.nifi.admin.service.UserService; ++import org.apache.nifi.annotation.lifecycle.OnAdded; ++import org.apache.nifi.annotation.lifecycle.OnRemoved; + import org.apache.nifi.cluster.BulletinsPayload; + import org.apache.nifi.cluster.HeartbeatPayload; + import org.apache.nifi.cluster.protocol.DataFlow; + import org.apache.nifi.cluster.protocol.Heartbeat; + import org.apache.nifi.cluster.protocol.NodeBulletins; + import org.apache.nifi.cluster.protocol.NodeIdentifier; + import org.apache.nifi.cluster.protocol.NodeProtocolSender; + import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; + import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; + import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; + import org.apache.nifi.connectable.Connectable; + import org.apache.nifi.connectable.ConnectableType; + import org.apache.nifi.connectable.Connection; + import org.apache.nifi.connectable.Funnel; + import org.apache.nifi.connectable.LocalPort; + import org.apache.nifi.connectable.Port; + import org.apache.nifi.connectable.Position; + import org.apache.nifi.connectable.Size; + import org.apache.nifi.connectable.StandardConnection; + import org.apache.nifi.controller.exception.CommunicationsException; + import org.apache.nifi.controller.exception.ProcessorInstantiationException; + import org.apache.nifi.controller.exception.ProcessorLifeCycleException; + import org.apache.nifi.controller.label.Label; + import org.apache.nifi.controller.label.StandardLabel; + import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; + import org.apache.nifi.controller.reporting.StandardReportingTaskNode; + import org.apache.nifi.controller.repository.ContentRepository; + import org.apache.nifi.controller.repository.CounterRepository; + import org.apache.nifi.controller.repository.FlowFileEvent; + import org.apache.nifi.controller.repository.FlowFileEventRepository; + import org.apache.nifi.controller.repository.FlowFileRecord; + import org.apache.nifi.controller.repository.FlowFileRepository; + import org.apache.nifi.controller.repository.FlowFileSwapManager; + import org.apache.nifi.controller.repository.QueueProvider; + import org.apache.nifi.controller.repository.RepositoryRecord; + import org.apache.nifi.controller.repository.RepositoryStatusReport; + import org.apache.nifi.controller.repository.StandardCounterRepository; + import org.apache.nifi.controller.repository.StandardFlowFileRecord; + import org.apache.nifi.controller.repository.StandardRepositoryRecord; + import org.apache.nifi.controller.repository.claim.ContentClaim; + import org.apache.nifi.controller.repository.claim.ContentClaimManager; + import org.apache.nifi.controller.repository.claim.ContentDirection; + import org.apache.nifi.controller.repository.claim.StandardContentClaimManager; + import org.apache.nifi.controller.repository.io.LimitedInputStream; + import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent; + import org.apache.nifi.controller.scheduling.ProcessContextFactory; + import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent; + import org.apache.nifi.controller.scheduling.StandardProcessScheduler; + import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent; + import org.apache.nifi.controller.service.ControllerServiceNode; + import org.apache.nifi.controller.service.ControllerServiceProvider; + import org.apache.nifi.controller.service.StandardControllerServiceProvider; + import org.apache.nifi.controller.status.ConnectionStatus; + import org.apache.nifi.controller.status.PortStatus; + import org.apache.nifi.controller.status.ProcessGroupStatus; + import org.apache.nifi.controller.status.ProcessorStatus; + import org.apache.nifi.controller.status.RemoteProcessGroupStatus; + import org.apache.nifi.controller.status.RunStatus; + import org.apache.nifi.controller.status.TransmissionStatus; + import org.apache.nifi.controller.status.history.ComponentStatusRepository; + import org.apache.nifi.controller.status.history.StatusHistoryUtil; + import org.apache.nifi.controller.tasks.ExpireFlowFiles; + import org.apache.nifi.diagnostics.SystemDiagnostics; + import org.apache.nifi.diagnostics.SystemDiagnosticsFactory; + import org.apache.nifi.encrypt.StringEncryptor; + import org.apache.nifi.engine.FlowEngine; + import org.apache.nifi.events.BulletinFactory; + import org.apache.nifi.events.EventReporter; + import org.apache.nifi.events.NodeBulletinProcessingStrategy; + import org.apache.nifi.events.VolatileBulletinRepository; + import org.apache.nifi.flowfile.FlowFilePrioritizer; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.framework.security.util.SslContextFactory; + import org.apache.nifi.groups.ProcessGroup; + import org.apache.nifi.groups.RemoteProcessGroup; + import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; + import org.apache.nifi.groups.StandardProcessGroup; + import org.apache.nifi.stream.io.StreamUtils; + import org.apache.nifi.logging.LogLevel; + import org.apache.nifi.logging.LogRepository; + import org.apache.nifi.logging.LogRepositoryFactory; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.logging.ProcessorLogObserver; + import org.apache.nifi.nar.ExtensionManager; ++import org.apache.nifi.nar.NarClassLoader; + import org.apache.nifi.nar.NarCloseable; + import org.apache.nifi.nar.NarThreadContextClassLoader; + import org.apache.nifi.processor.Processor; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.QueueSize; + import org.apache.nifi.processor.Relationship; + import org.apache.nifi.processor.SimpleProcessLogger; + import org.apache.nifi.processor.StandardProcessorInitializationContext; + import org.apache.nifi.processor.StandardValidationContextFactory; -import org.apache.nifi.processor.annotation.OnAdded; + import org.apache.nifi.provenance.ProvenanceEventRecord; + import org.apache.nifi.provenance.ProvenanceEventRepository; + import org.apache.nifi.provenance.ProvenanceEventType; + import org.apache.nifi.provenance.StandardProvenanceEventRecord; + import org.apache.nifi.remote.RemoteGroupPort; + import org.apache.nifi.remote.RemoteResourceManager; + import org.apache.nifi.remote.RemoteSiteListener; + import org.apache.nifi.remote.RootGroupPort; + import org.apache.nifi.remote.SocketRemoteSiteListener; + import org.apache.nifi.remote.StandardRemoteProcessGroup; + import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor; + import org.apache.nifi.remote.StandardRootGroupPort; + import org.apache.nifi.remote.TransferDirection; + import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol; + import org.apache.nifi.reporting.Bulletin; + import org.apache.nifi.reporting.BulletinRepository; + import org.apache.nifi.reporting.EventAccess; + import org.apache.nifi.reporting.ReportingTask; + import org.apache.nifi.reporting.Severity; + import org.apache.nifi.scheduling.SchedulingStrategy; + import org.apache.nifi.util.FormatUtils; + import org.apache.nifi.util.NiFiProperties; + import org.apache.nifi.util.ReflectionUtils; + import org.apache.nifi.web.api.dto.ConnectableDTO; + import org.apache.nifi.web.api.dto.ConnectionDTO; + import org.apache.nifi.web.api.dto.FlowSnippetDTO; + import org.apache.nifi.web.api.dto.FunnelDTO; + import org.apache.nifi.web.api.dto.LabelDTO; + import org.apache.nifi.web.api.dto.PortDTO; + import org.apache.nifi.web.api.dto.PositionDTO; + import org.apache.nifi.web.api.dto.ProcessGroupDTO; + import org.apache.nifi.web.api.dto.ProcessorConfigDTO; + import org.apache.nifi.web.api.dto.ProcessorDTO; + import org.apache.nifi.web.api.dto.RelationshipDTO; + import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; + import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; + import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; + import org.apache.nifi.web.api.dto.TemplateDTO; + import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; + import org.apache.commons.lang3.StringUtils; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import com.sun.jersey.api.client.ClientHandlerException; + + public class FlowController implements EventAccess, ControllerServiceProvider, Heartbeater, QueueProvider { + + // default repository implementations + public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository"; + public static final String DEFAULT_CONTENT_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.FileSystemRepository"; + public static final String DEFAULT_PROVENANCE_REPO_IMPLEMENTATION = "org.apache.nifi.provenance.VolatileProvenanceRepository"; + public static final String DEFAULT_SWAP_MANAGER_IMPLEMENTATION = "org.apache.nifi.controller.FileSystemSwapManager"; + public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = "org.apache.nifi.controller.status.history.VolatileComponentStatusRepository"; + + public static final String SCHEDULE_MINIMUM_NANOSECONDS = "flowcontroller.minimum.nanoseconds"; + public static final String GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.seconds"; + public static final long DEFAULT_GRACEFUL_SHUTDOWN_SECONDS = 10; + public static final int METRICS_RESERVOIR_SIZE = 288; // 1 day worth of 5-minute captures + + public static final String ROOT_GROUP_ID_ALIAS = "root"; + public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow"; + + private final AtomicInteger maxTimerDrivenThreads; + private final AtomicInteger maxEventDrivenThreads; + private final AtomicReference<FlowEngine> timerDrivenEngineRef; + private final AtomicReference<FlowEngine> eventDrivenEngineRef; + + private final ContentRepository contentRepository; + private final FlowFileRepository flowFileRepository; + private final FlowFileEventRepository flowFileEventRepository; + private final ProvenanceEventRepository provenanceEventRepository; + private final VolatileBulletinRepository bulletinRepository; + private final StandardProcessScheduler processScheduler; + private final TemplateManager templateManager; + private final SnippetManager snippetManager; + private final long gracefulShutdownSeconds; + private final ExtensionManager extensionManager; + private final NiFiProperties properties; + private final SSLContext sslContext; + private final RemoteSiteListener externalSiteListener; + private final AtomicReference<CounterRepository> counterRepositoryRef; + private final AtomicBoolean initialized = new AtomicBoolean(false); + private final ControllerServiceProvider controllerServiceProvider; + private final UserService userService; + private final EventDrivenWorkerQueue eventDrivenWorkerQueue; + private final ComponentStatusRepository componentStatusRepository; + private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started + private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>(); + + // The Heartbeat Bean is used to provide an Atomic Reference to data that is used in heartbeats that may + // change while the instance is running. We do this because we want to generate heartbeats even if we + // are unable to obtain a read lock on the entire FlowController. + private final AtomicReference<HeartbeatBean> heartbeatBeanRef = new AtomicReference<>(); + private final AtomicBoolean heartbeatsSuspended = new AtomicBoolean(false); + + private final Integer remoteInputSocketPort; + private final Boolean isSiteToSiteSecure; + private Integer clusterManagerRemoteSitePort = null; + private Boolean clusterManagerRemoteSiteCommsSecure = null; + + private ProcessGroup rootGroup; + private final List<Connectable> startConnectablesAfterInitialization; + private final List<RemoteGroupPort> startRemoteGroupPortsAfterInitialization; + + /** + * true if controller is configured to operate in a clustered environment + */ + private final boolean configuredForClustering; + + /** + * the time to wait between heartbeats + */ + private final int heartbeatDelaySeconds; + + /** + * The sensitive property string encryptor * + */ + private final StringEncryptor encryptor; + + /** + * cluster protocol sender + */ + private final NodeProtocolSender protocolSender; + + private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks"); + private final ContentClaimManager contentClaimManager = new StandardContentClaimManager(); + + // guarded by rwLock + /** + * timer to periodically send heartbeats to the cluster + */ + private ScheduledFuture<?> bulletinFuture; + private ScheduledFuture<?> heartbeatGeneratorFuture; + private ScheduledFuture<?> heartbeatSenderFuture; + + // guarded by FlowController lock + /** + * timer task to generate heartbeats + */ + private final AtomicReference<HeartbeatMessageGeneratorTask> heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null); + + private AtomicReference<NodeBulletinProcessingStrategy> nodeBulletinSubscriber; + + // guarded by rwLock + /** + * the node identifier; + */ + private NodeIdentifier nodeId; + + // guarded by rwLock + /** + * true if controller is connected or trying to connect to the cluster + */ + private boolean clustered; + private String clusterManagerDN; + + // guarded by rwLock + /** + * true if controller is the primary of the cluster + */ + private boolean primary; + + // guarded by rwLock + /** + * true if connected to a cluster + */ + private boolean connected; + + // guarded by rwLock + private String instanceId; + + private volatile boolean shutdown = false; + + private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + private final Lock writeLock = rwLock.writeLock(); + + private FlowFileSwapManager flowFileSwapManager; // guarded by read/write lock + + private static final Logger LOG = LoggerFactory.getLogger(FlowController.class); + private static final Logger heartbeatLogger = LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat"); + + public static FlowController createStandaloneInstance( + final FlowFileEventRepository flowFileEventRepo, + final NiFiProperties properties, + final UserService userService, + final StringEncryptor encryptor) { + return new FlowController( + flowFileEventRepo, + properties, + userService, + encryptor, + /* configuredForClustering */ false, + /* NodeProtocolSender */ null); + } + + public static FlowController createClusteredInstance( + final FlowFileEventRepository flowFileEventRepo, + final NiFiProperties properties, + final UserService userService, + final StringEncryptor encryptor, + final NodeProtocolSender protocolSender) { + final FlowController flowController = new FlowController( + flowFileEventRepo, + properties, + userService, + encryptor, + /* configuredForClustering */ true, + /* NodeProtocolSender */ protocolSender); + + flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), properties.isSiteToSiteSecure()); + + return flowController; + } + + private FlowController( + final FlowFileEventRepository flowFileEventRepo, + final NiFiProperties properties, + final UserService userService, + final StringEncryptor encryptor, + final boolean configuredForClustering, + final NodeProtocolSender protocolSender) { + + maxTimerDrivenThreads = new AtomicInteger(10); + maxEventDrivenThreads = new AtomicInteger(5); + + this.encryptor = encryptor; + this.properties = properties; + sslContext = SslContextFactory.createSslContext(properties, false); + extensionManager = new ExtensionManager(); + controllerServiceProvider = new StandardControllerServiceProvider(); + + timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process")); + eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process")); + + final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, contentClaimManager); + flowFileRepository = flowFileRepo; + flowFileEventRepository = flowFileEventRepo; + counterRepositoryRef = new AtomicReference<CounterRepository>(new StandardCounterRepository()); + + bulletinRepository = new VolatileBulletinRepository(); + nodeBulletinSubscriber = new AtomicReference<>(); + + try { + this.provenanceEventRepository = createProvenanceRepository(properties); + this.provenanceEventRepository.initialize(createEventReporter(bulletinRepository)); + + this.contentRepository = createContentRepository(properties); + } catch (final Exception e) { + throw new RuntimeException("Unable to create Provenance Repository", e); + } + + processScheduler = new StandardProcessScheduler(this, this, encryptor); + eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler); + + final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository); + processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent( + eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor)); + + final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor); + final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor); + processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent); + processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent); + processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, quartzSchedulingAgent); + processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS); + + startConnectablesAfterInitialization = new ArrayList<>(); + startRemoteGroupPortsAfterInitialization = new ArrayList<>(); + this.userService = userService; + + final String gracefulShutdownSecondsVal = properties.getProperty(GRACEFUL_SHUTDOWN_PERIOD); + long shutdownSecs; + try { + shutdownSecs = Long.parseLong(gracefulShutdownSecondsVal); + if (shutdownSecs < 1) { + shutdownSecs = DEFAULT_GRACEFUL_SHUTDOWN_SECONDS; + } + } catch (final NumberFormatException nfe) { + shutdownSecs = DEFAULT_GRACEFUL_SHUTDOWN_SECONDS; + } + gracefulShutdownSeconds = shutdownSecs; + + remoteInputSocketPort = properties.getRemoteInputPort(); + isSiteToSiteSecure = properties.isSiteToSiteSecure(); + + if (isSiteToSiteSecure && sslContext == null && remoteInputSocketPort != null) { + throw new IllegalStateException("NiFi Configured to allow Secure Site-to-Site communications but the Keystore/Truststore properties are not configured"); + } + + this.configuredForClustering = configuredForClustering; + this.heartbeatDelaySeconds = (int) FormatUtils.getTimeDuration(properties.getNodeHeartbeatInterval(), TimeUnit.SECONDS); + this.protocolSender = protocolSender; + try { + this.templateManager = new TemplateManager(properties.getTemplateDirectory()); + } catch (IOException e) { + throw new RuntimeException(e); + } + + this.snippetManager = new SnippetManager(); + + rootGroup = new StandardProcessGroup(UUID.randomUUID().toString(), this, processScheduler, properties, encryptor); + rootGroup.setName(DEFAULT_ROOT_GROUP_NAME); + instanceId = UUID.randomUUID().toString(); + + if (remoteInputSocketPort == null){ + LOG.info("Not enabling Site-to-Site functionality because nifi.remote.input.socket.port is not set"); + externalSiteListener = null; + } else if (isSiteToSiteSecure && sslContext == null) { + LOG.error("Unable to create Secure Site-to-Site Listener because not all required Keystore/Truststore Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed."); + externalSiteListener = null; + } else { + // Register the SocketFlowFileServerProtocol as the appropriate resource for site-to-site Server Protocol + RemoteResourceManager.setServerProtocolImplementation(SocketFlowFileServerProtocol.RESOURCE_NAME, SocketFlowFileServerProtocol.class); + externalSiteListener = new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null); + externalSiteListener.setRootGroup(rootGroup); + } + + // Determine frequency for obtaining component status snapshots + final String snapshotFrequency = properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY); + long snapshotMillis; + try { + snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS); + } catch (final Exception e) { + snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS); + } + + componentStatusRepository = createComponentStatusRepository(); + timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + componentStatusRepository.capture(getControllerStatus()); + } + }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS); + + heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false)); + } + + private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ContentClaimManager contentClaimManager) { + final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION); + if (implementationClassName == null) { + throw new RuntimeException("Cannot create FlowFile Repository because the NiFi Properties is missing the following property: " + + NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION); + } + + try { + final FlowFileRepository created = NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileRepository.class); + synchronized (created) { + created.initialize(contentClaimManager); + } + return created; + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + private static FlowFileSwapManager createSwapManager(final NiFiProperties properties) { + final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_SWAP_MANAGER_IMPLEMENTATION, DEFAULT_SWAP_MANAGER_IMPLEMENTATION); + if (implementationClassName == null) { + return null; + } + + try { + return NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileSwapManager.class); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + private static EventReporter createEventReporter(final BulletinRepository bulletinRepository) { + return new EventReporter() { + @Override + public void reportEvent(final Severity severity, final String category, final String message) { + final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message); + bulletinRepository.addBulletin(bulletin); + } + }; + } + + public void initializeFlow() throws IOException { + writeLock.lock(); + try { + flowFileSwapManager = createSwapManager(properties); + + long maxIdFromSwapFiles = -1L; + if (flowFileSwapManager != null) { + if (flowFileRepository.isVolatile()) { + flowFileSwapManager.purge(); + } else { + maxIdFromSwapFiles = flowFileSwapManager.recoverSwappedFlowFiles(this, contentClaimManager); + } + } + + flowFileRepository.loadFlowFiles(this, maxIdFromSwapFiles + 1); + + // now that we've loaded the FlowFiles, this has restored our ContentClaims' states, so we can tell the + // ContentRepository to purge superfluous files + contentRepository.cleanup(); + + if (flowFileSwapManager != null) { + flowFileSwapManager.start(flowFileRepository, this, contentClaimManager, createEventReporter(bulletinRepository)); + } + + if (externalSiteListener != null) { + externalSiteListener.start(); + } + + timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + updateRemoteProcessGroups(); + } catch (final Throwable t) { + LOG.warn("Unable to update Remote Process Groups due to " + t); + if (LOG.isDebugEnabled()) { + LOG.warn("", t); + } + } + } + }, 0L, 30L, TimeUnit.SECONDS); + + initialized.set(true); + } finally { + writeLock.unlock(); + } + } + + /** + * <p> + * Causes any processors that were added to the flow with a 'delayStart' + * flag of true to now start + * </p> + */ + public void startDelayed() { + writeLock.lock(); + try { + LOG.info("Starting {} processors/ports/funnels", (startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size())); + for (final Connectable connectable : startConnectablesAfterInitialization) { + if (connectable.getScheduledState() == ScheduledState.DISABLED) { + continue; + } + + try { + if (connectable instanceof ProcessorNode) { + connectable.getProcessGroup().startProcessor((ProcessorNode) connectable); + } else { + startConnectable(connectable); + } + } catch (final Throwable t) { + LOG.error("Unable to start {} due to {}", new Object[]{connectable, t}); + } + } + + startConnectablesAfterInitialization.clear(); + + int startedTransmitting = 0; + for (final RemoteGroupPort remoteGroupPort : startRemoteGroupPortsAfterInitialization) { + try { + remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort); + startedTransmitting++; + } catch (final Throwable t) { + LOG.error("Unable to start transmitting with {} due to {}", new Object[]{remoteGroupPort, t}); + } + } + + LOG.info("Started {} Remote Group Ports transmitting", startedTransmitting); + startRemoteGroupPortsAfterInitialization.clear(); + } finally { + writeLock.unlock(); + } + } + + private ContentRepository createContentRepository(final NiFiProperties properties) throws InstantiationException, IllegalAccessException, ClassNotFoundException { + final String implementationClassName = properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, DEFAULT_CONTENT_REPO_IMPLEMENTATION); + if (implementationClassName == null) { + throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: " + + NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION); + } + + try { + final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(implementationClassName, ContentRepository.class); + synchronized (contentRepo) { + contentRepo.initialize(contentClaimManager); + } + return contentRepo; + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + private ProvenanceEventRepository createProvenanceRepository(final NiFiProperties properties) throws InstantiationException, IllegalAccessException, ClassNotFoundException { + final String implementationClassName = properties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, DEFAULT_PROVENANCE_REPO_IMPLEMENTATION); + if (implementationClassName == null) { + throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: " + + NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS); + } + + try { + return NarThreadContextClassLoader.createInstance(implementationClassName, ProvenanceEventRepository.class); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + private ComponentStatusRepository createComponentStatusRepository() { + final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION); + if (implementationClassName == null) { + throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: " + + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION); + } + + try { + return NarThreadContextClassLoader.createInstance(implementationClassName, ComponentStatusRepository.class); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Creates a connection between two Connectable objects. + * + * @param id required ID of the connection + * @param name the name of the connection, or <code>null</code> to leave the + * connection unnamed + * @param source required source + * @param destination required destination + * @param relationshipNames required collection of relationship names + * @return + * + * @throws NullPointerException if the ID, source, destination, or set of + * relationships is null. + * @throws IllegalArgumentException if <code>relationships</code> is an + * empty collection + */ + public Connection createConnection(final String id, final String name, final Connectable source, final Connectable destination, final Collection<String> relationshipNames) { + final StandardConnection.Builder builder = new StandardConnection.Builder(processScheduler); + + final List<Relationship> relationships = new ArrayList<>(); + for (final String relationshipName : requireNonNull(relationshipNames)) { + relationships.add(new Relationship.Builder().name(relationshipName).build()); + } + + return builder.id(requireNonNull(id).intern()).name(name == null ? null : name.intern()).relationships(relationships).source(requireNonNull(source)).destination(destination).build(); + } + + /** + * Creates a new Label + * + * @param id + * @param text + * @return + * @throws NullPointerException if either argument is null + */ + public Label createLabel(final String id, final String text) { + return new StandardLabel(requireNonNull(id).intern(), text); + } + + /** + * Creates a funnel + * + * @param id + * @return + */ + public Funnel createFunnel(final String id) { + return new StandardFunnel(id.intern(), null, processScheduler); + } + + /** + * Creates a Port to use as an Input Port for a Process Group + * + * @param id + * @param name + * @return + * @throws NullPointerException if the ID or name is not unique + * @throws IllegalStateException if an Input Port already exists with the + * same name or id. + */ + public Port createLocalInputPort(String id, String name) { + id = requireNonNull(id).intern(); + name = requireNonNull(name).intern(); + verifyPortIdDoesNotExist(id); + return new LocalPort(id, name, null, ConnectableType.INPUT_PORT, processScheduler); + } + + /** + * Creates a Port to use as an Output Port for a Process Group + * + * @param id + * @param name + * @return + * @throws NullPointerException if the ID or name is not unique + * @throws IllegalStateException if an Input Port already exists with the + * same name or id. + */ + public Port createLocalOutputPort(String id, String name) { + id = requireNonNull(id).intern(); + name = requireNonNull(name).intern(); + verifyPortIdDoesNotExist(id); + return new LocalPort(id, name, null, ConnectableType.OUTPUT_PORT, processScheduler); + } + + /** + * Creates a ProcessGroup with the given ID + * + * @param id + * @return + * @throws NullPointerException if the argument is null + */ + public ProcessGroup createProcessGroup(final String id) { + return new StandardProcessGroup(requireNonNull(id).intern(), this, processScheduler, properties, encryptor); + } + + /** + * <p> + * Creates a new ProcessorNode with the given type and identifier and initializes it invoking the + * methods annotated with {@link OnAdded}. + * </p> + * + * @param type + * @param id + * @return + * @throws NullPointerException if either arg is null + * @throws ProcessorInstantiationException if the processor cannot be + * instantiated for any reason + */ + public ProcessorNode createProcessor(final String type, String id) throws ProcessorInstantiationException { + return createProcessor(type, id, true); + } + + /** + * <p> + * Creates a new ProcessorNode with the given type and identifier and optionally initializes it. + * </p> + * + * @param type the fully qualified Processor class name + * @param id the unique ID of the Processor + * @param firstTimeAdded whether or not this is the first time this Processor is added to the graph. If {@code true}, + * will invoke methods annotated with the {@link OnAdded} annotation. + * @return + * @throws NullPointerException if either arg is null + * @throws ProcessorInstantiationException if the processor cannot be + * instantiated for any reason + */ ++ @SuppressWarnings("deprecation") + public ProcessorNode createProcessor(final String type, String id, final boolean firstTimeAdded) throws ProcessorInstantiationException { + id = id.intern(); + final Processor processor = instantiateProcessor(type, id); + final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider); + final ProcessorNode procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider); + + final LogRepository logRepository = LogRepositoryFactory.getRepository(id); + logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode)); + + if ( firstTimeAdded ) { + try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor); ++ ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, org.apache.nifi.processor.annotation.OnAdded.class, processor); + } catch (final Exception e) { + logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID); + throw new ProcessorLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e); + } + } + + return procNode; + } + + private Processor instantiateProcessor(final String type, final String identifier) throws ProcessorInstantiationException { + Processor processor; + + final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); + try { + final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type); + final Class<?> rawClass; + if (detectedClassLoaderForType == null) { + // try to find from the current class loader + rawClass = Class.forName(type); + } else { + // try to find from the registered classloader for that type + rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type)); + } + + Thread.currentThread().setContextClassLoader(detectedClassLoaderForType); + final Class<? extends Processor> processorClass = rawClass.asSubclass(Processor.class); + processor = processorClass.newInstance(); + final ProcessorLog processorLogger = new SimpleProcessLogger(identifier, processor); + final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, processorLogger, this); + processor.initialize(ctx); + return processor; + } catch (final Throwable t) { + throw new ProcessorInstantiationException(type, t); + } finally { + if (ctxClassLoader != null) { + Thread.currentThread().setContextClassLoader(ctxClassLoader); + } + } + } + + /** + * @return the ExtensionManager used for instantiating Processors, + * Prioritizers, etc. + */ + public ExtensionManager getExtensionManager() { + return extensionManager; + } + + public String getInstanceId() { + readLock.lock(); + try { + return instanceId; + } finally { + readLock.unlock(); + } + } + + /** + * Gets the BulletinRepository for storing and retrieving Bulletins. + * + * @return + */ + public BulletinRepository getBulletinRepository() { + return bulletinRepository; + } + + public SnippetManager getSnippetManager() { + return snippetManager; + } + + /** + * Creates a Port to use as an Input Port for the root Process Group, which + * is used for Site-to-Site communications + * + * @param id + * @param name + * @return + * @throws NullPointerException if the ID or name is not unique + * @throws IllegalStateException if an Input Port already exists with the + * same name or id. + */ + public Port createRemoteInputPort(String id, String name) { + id = requireNonNull(id).intern(); + name = requireNonNull(name).intern(); + verifyPortIdDoesNotExist(id); + return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT, userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure)); + } + + /** + * Creates a Port to use as an Output Port for the root Process Group, which + * is used for Site-to-Site communications and will queue flow files waiting + * to be delivered to remote instances + * + * @param id + * @param name + * @return + * @throws NullPointerException if the ID or name is not unique + * @throws IllegalStateException if an Input Port already exists with the + * same name or id. + */ + public Port createRemoteOutputPort(String id, String name) { + id = requireNonNull(id).intern(); + name = requireNonNull(name).intern(); + verifyPortIdDoesNotExist(id); + return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT, userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure)); + } + + /** + * Creates a new Remote Process Group with the given ID that points to the + * given URI + * + * @param id + * @param uri + * @return + * + * @throws NullPointerException if either argument is null + * @throws IllegalArgumentException if <code>uri</code> is not a valid URI. + */ + public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uri) { + return new StandardRemoteProcessGroup(requireNonNull(id).intern(), requireNonNull(uri).intern(), null, this, sslContext); + } + + /** + * Verifies that no output port exists with the given id or name. If this + * does not hold true, throws an IllegalStateException + * + * @param id + * @throws IllegalStateException + */ + private void verifyPortIdDoesNotExist(final String id) { + Port port = rootGroup.findOutputPort(id); + if (port != null) { + throw new IllegalStateException("An Input Port already exists with ID " + id); + } + port = rootGroup.findInputPort(id); + if (port != null) { + throw new IllegalStateException("An Input Port already exists with ID " + id); + } + } + + /** + * @return the name of this controller, which is also the name of the Root + * Group. + */ + public String getName() { + readLock.lock(); + try { + return rootGroup.getName(); + } finally { + readLock.unlock(); + } + } + + /** + * Sets the name for the Root Group, which also changes the name for the + * controller. + * + * @param name + */ + public void setName(final String name) { + readLock.lock(); + try { + rootGroup.setName(name); + } finally { + readLock.unlock(); + } + } + + /** + * Gets the comments of this controller, which is also the comment of the + * Root Group. + * + * @return + */ + public String getComments() { + readLock.lock(); + try { + return rootGroup.getComments(); + } finally { + readLock.unlock(); + } + } + + /** + * Sets the comment for the Root Group, which also changes the comment for + * the controller. + * + * @param comments + */ + public void setComments(final String comments) { + readLock.lock(); + try { + rootGroup.setComments(comments); + } finally { + readLock.unlock(); + } + } + + /** + * @return <code>true</code> if the scheduling engine for this controller + * has been terminated. + */ + public boolean isTerminated() { + this.readLock.lock(); + try { + return (null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated()); + } finally { + this.readLock.unlock(); + } + } + + /** + * Triggers the controller to begin shutdown, stopping all processors and + * terminating the scheduling engine. After calling this method, the + * {@link #isTerminated()} method will indicate whether or not the shutdown + * has finished. + * + * @param kill if <code>true</code>, attempts to stop all active threads, + * but makes no guarantee that this will happen + * + * @throws IllegalStateException if the controller is already stopped or + * currently in the processor of stopping + */ + public void shutdown(final boolean kill) { + this.shutdown = true; + stopAllProcessors(); + + writeLock.lock(); + try { + if (isTerminated() || timerDrivenEngineRef.get().isTerminating()) { + throw new IllegalStateException("Controller already stopped or still stopping..."); + } + + if (kill) { + this.timerDrivenEngineRef.get().shutdownNow(); + this.eventDrivenEngineRef.get().shutdownNow(); + LOG.info("Initiated immediate shutdown of flow controller..."); + } else { + this.timerDrivenEngineRef.get().shutdown(); + this.eventDrivenEngineRef.get().shutdown(); + LOG.info("Initiated graceful shutdown of flow controller...waiting up to " + gracefulShutdownSeconds + " seconds"); + } + + clusterTaskExecutor.shutdown(); + + // Trigger any processors' methods marked with @OnShutdown to be called + rootGroup.shutdown(); + + try { + this.timerDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS); + this.eventDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS); + } catch (final InterruptedException ie) { + LOG.info("Interrupted while waiting for controller termination."); + } + + try { + flowFileRepository.close(); + } catch (final Throwable t) { + LOG.warn("Unable to shut down FlowFileRepository due to {}", new Object[]{t}); + } + + if (this.timerDrivenEngineRef.get().isTerminated() && eventDrivenEngineRef.get().isTerminated()) { + LOG.info("Controller has been terminated successfully."); + } else { + LOG.warn("Controller hasn't terminated properly. There exists an uninterruptable thread that will take an indeterminate amount of time to stop. Might need to kill the program manually."); + } + + if (externalSiteListener != null) { + externalSiteListener.stop(); + } + + if (flowFileSwapManager != null) { + flowFileSwapManager.shutdown(); + } + + if ( processScheduler != null ) { + processScheduler.shutdown(); + } + + if ( contentRepository != null ) { + contentRepository.shutdown(); + } + + if ( provenanceEventRepository != null ) { + try { + provenanceEventRepository.close(); + } catch (final IOException ioe) { + LOG.warn("There was a problem shutting down the Provenance Repository: " + ioe.toString()); + if ( LOG.isDebugEnabled() ) { + LOG.warn("", ioe); + } + } + } + } finally { + writeLock.unlock(); + } + } + + /** + * Serializes the current state of the controller to the given OutputStream + * + * @param serializer + * @param os + * @throws FlowSerializationException if serialization of the flow fails for + * any reason + */ + public void serialize(final FlowSerializer serializer, final OutputStream os) throws FlowSerializationException { + readLock.lock(); + try { + serializer.serialize(this, os); + } finally { + readLock.unlock(); + } + } + + /** + * Synchronizes this controller with the proposed flow. + * + * For more details, see + * {@link FlowSynchronizer#sync(FlowController, DataFlow)}. + * + * @param synchronizer + * @param dataFlow the flow to load the controller with. If the flow is null + * or zero length, then the controller must not have a flow or else an + * UninheritableFlowException will be thrown. + * + * @throws FlowSerializationException if proposed flow is not a valid flow + * configuration file + * @throws UninheritableFlowException if the proposed flow cannot be loaded + * by the controller because in doing so would risk orphaning flow files + * @throws FlowSynchronizationException if updates to the controller failed. + * If this exception is thrown, then the controller should be considered + * unsafe to be used + */ + public void synchronize(final FlowSynchronizer synchronizer, final DataFlow dataFlow) + throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException { + writeLock.lock(); + try { + LOG.debug("Synchronizing controller with proposed flow"); + synchronizer.sync(this, dataFlow, encryptor); + LOG.info("Successfully synchronized controller with proposed flow"); + } finally { + writeLock.unlock(); + } + } + + /** + * @return the currently configured maximum number of threads that can be + * used for executing processors at any given time. + */ + public int getMaxTimerDrivenThreadCount() { + return maxTimerDrivenThreads.get(); + } + + public int getMaxEventDrivenThreadCount() { + return maxEventDrivenThreads.get(); + } + + public void setMaxTimerDrivenThreadCount(final int maxThreadCount) { + writeLock.lock(); + try { + setMaxThreadCount(maxThreadCount, this.timerDrivenEngineRef.get(), this.maxTimerDrivenThreads); + } finally { + writeLock.unlock(); + } + } + + public void setMaxEventDrivenThreadCount(final int maxThreadCount) { + writeLock.lock(); + try { + setMaxThreadCount(maxThreadCount, this.eventDrivenEngineRef.get(), this.maxEventDrivenThreads); + processScheduler.setMaxThreadCount(SchedulingStrategy.EVENT_DRIVEN, maxThreadCount); + } finally { + writeLock.unlock(); + } + } + + /** + * Updates the number of threads that can be simultaneously used for + * executing processors. + * + * @param maxThreadCount + * + * This method must be called while holding the write lock! + */ + private void setMaxThreadCount(final int maxThreadCount, final FlowEngine engine, final AtomicInteger maxThreads) { + if (maxThreadCount < 1) { + throw new IllegalArgumentException(); + } + + maxThreads.getAndSet(maxThreadCount); + if (null != engine && engine.getCorePoolSize() < maxThreadCount) { + engine.setCorePoolSize(maxThreads.intValue()); + } + } + + /** + * @return the ID of the root group + */ + public String getRootGroupId() { + readLock.lock(); + try { + return rootGroup.getIdentifier(); + } finally { + readLock.unlock(); + } + } + + /** + * Sets the root group to the given group + * + * @param group the ProcessGroup that is to become the new Root Group + * + * @throws IllegalArgumentException if the ProcessGroup has a parent + * @throws IllegalStateException if the FlowController does not know about + * the given process group + */ + void setRootGroup(final ProcessGroup group) { + if (requireNonNull(group).getParent() != null) { + throw new IllegalArgumentException("A ProcessGroup that has a parent cannot be the Root Group"); + } + + writeLock.lock(); + try { + rootGroup = group; + + if (externalSiteListener != null) { + externalSiteListener.setRootGroup(group); + } + + // update the heartbeat bean + this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected)); + } finally { + writeLock.unlock(); + } + } + + public SystemDiagnostics getSystemDiagnostics() { + final SystemDiagnosticsFactory factory = new SystemDiagnosticsFactory(); + return factory.create(flowFileRepository, contentRepository); + } + + // + // ProcessGroup access + // + /** + * Updates the process group corresponding to the specified DTO. Any field + * in DTO that is <code>null</code> (with the exception of the required ID) + * will be ignored. + * + * @param dto + * @return a fully-populated DTO representing the newly updated ProcessGroup + * @throws ProcessorInstantiationException + * + * @throws IllegalStateException if no process group can be found with the + * ID of DTO or with the ID of the DTO's parentGroupId, if the template ID + * specified is invalid, or if the DTO's Parent Group ID changes but the + * parent group has incoming or outgoing connections + * + * @throws NullPointerException if the DTO or its ID is null + */ + public void updateProcessGroup(final ProcessGroupDTO dto) throws ProcessorInstantiationException { + final ProcessGroup group = lookupGroup(requireNonNull(dto).getId()); + + final String name = dto.getName(); + final PositionDTO position = dto.getPosition(); + final String comments = dto.getComments(); + + if (name != null) { + group.setName(name); + } + if (position != null) { + group.setPosition(toPosition(position)); + } + if (comments != null) { + group.setComments(comments); + } + } + + // + // Template access + // + /** + * Adds a template to this controller. The contents of this template must be + * part of the current flow. This is going create a template based on a + * snippet of this flow. + * + * @param dto + * @return a copy of the given DTO + * @throws IOException if an I/O error occurs when persisting the Template + * @throws NullPointerException if the DTO is null + * @throws IllegalArgumentException if does not contain all required + * information, such as the template name or a processor's configuration + * element + */ + public Template addTemplate(final TemplateDTO dto) throws IOException { + return templateManager.addTemplate(dto); + } + + /** + * Removes all templates from this controller + * + * @throws IOException + */ + public void clearTemplates() throws IOException { + templateManager.clear(); + } + + /** + * Imports the specified template into this controller. The contents of this + * template may have come from another NiFi instance. + * + * @param dto + * @return + * @throws IOException + */ + public Template importTemplate(final TemplateDTO dto) throws IOException { + return templateManager.importTemplate(dto); + } + + /** + * Returns the template with the given ID, or <code>null</code> if no + * template exists with the given ID. + * + * @param id + * @return + */ + public Template getTemplate(final String id) { + return templateManager.getTemplate(id); + } + + public TemplateManager getTemplateManager() { + return templateManager; + } + + /** + * Returns all templates that this controller knows about. + * + * @return + */ + public Collection<Template> getTemplates() { + return templateManager.getTemplates(); + } + + /** + * Removes the template with the given ID. + * + * @param id the ID of the template to remove + * @throws NullPointerException if the argument is null + * @throws IllegalStateException if no template exists with the given ID + * @throws IOException if template could not be removed + */ + public void removeTemplate(final String id) throws IOException, IllegalStateException { + templateManager.removeTemplate(id); + } + + private Position toPosition(final PositionDTO dto) { + return new Position(dto.getX(), dto.getY()); + } + + // + // Snippet + // + /** + * Creates an instance of the given snippet and adds the components to the + * given group + * + * @param group + * @param dto + * + * @throws NullPointerException if either argument is null + * @throws IllegalStateException if the snippet is not valid because a + * component in the snippet has an ID that is not unique to this flow, or + * because it shares an Input Port or Output Port at the root level whose + * name already exists in the given ProcessGroup, or because the Template + * contains a Processor or a Prioritizer whose class is not valid within + * this instance of NiFi. + * @throws ProcessorInstantiationException if unable to instantiate a + * processor + */ + public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException { + writeLock.lock(); + try { + validateSnippetContents(requireNonNull(group), dto); + + // + // Instantiate the labels + // + for (final LabelDTO labelDTO : dto.getLabels()) { + final Label label = createLabel(labelDTO.getId(), labelDTO.getLabel()); + label.setPosition(toPosition(labelDTO.getPosition())); + if (labelDTO.getWidth() != null && labelDTO.getHeight() != null) { + label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight())); + } + + // TODO: Update the label's "style" + group.addLabel(label); + } + + // + // Instantiate the funnels + for (final FunnelDTO funnelDTO : dto.getFunnels()) { + final Funnel funnel = createFunnel(funnelDTO.getId()); + funnel.setPosition(toPosition(funnelDTO.getPosition())); + group.addFunnel(funnel); + } + + // + // Instantiate Input Ports & Output Ports + // + for (final PortDTO portDTO : dto.getInputPorts()) { + final Port inputPort; + if (group.isRootGroup()) { + inputPort = createRemoteInputPort(portDTO.getId(), portDTO.getName()); + inputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount()); + if (portDTO.getGroupAccessControl() != null) { + ((RootGroupPort) inputPort).setGroupAccessControl(portDTO.getGroupAccessControl()); + } + if (portDTO.getUserAccessControl() != null) { + ((RootGroupPort) inputPort).setUserAccessControl(portDTO.getUserAccessControl()); + } + } else { + inputPort = createLocalInputPort(portDTO.getId(), portDTO.getName()); + } + + inputPort.setPosition(toPosition(portDTO.getPosition())); + inputPort.setProcessGroup(group); + inputPort.setComments(portDTO.getComments()); + group.addInputPort(inputPort); + } + + for (final PortDTO portDTO : dto.getOutputPorts()) { + final Port outputPort; + if (group.isRootGroup()) { + outputPort = createRemoteOutputPort(portDTO.getId(), portDTO.getName()); + outputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount()); + if (portDTO.getGroupAccessControl() != null) { + ((RootGroupPort) outputPort).setGroupAccessControl(portDTO.getGroupAccessControl()); + } + if (portDTO.getUserAccessControl() != null) { + ((RootGroupPort) outputPort).setUserAccessControl(portDTO.getUserAccessControl()); + } + } else { + outputPort = createLocalOutputPort(portDTO.getId(), portDTO.getName()); + } + + outputPort.setPosition(toPosition(portDTO.getPosition())); + outputPort.setProcessGroup(group); + outputPort.setComments(portDTO.getComments()); + group.addOutputPort(outputPort); + } + + // + // Instantiate the processors + // + for (final ProcessorDTO processorDTO : dto.getProcessors()) { + final ProcessorNode procNode = createProcessor(processorDTO.getType(), processorDTO.getId()); + + procNode.setPosition(toPosition(processorDTO.getPosition())); + procNode.setProcessGroup(group); + + final ProcessorConfigDTO config = processorDTO.getConfig(); + procNode.setComments(config.getComments()); + if (config.isLossTolerant() != null) { + procNode.setLossTolerant(config.isLossTolerant()); + } + procNode.setName(processorDTO.getName()); + + procNode.setYieldPeriod(config.getYieldDuration()); + procNode.setPenalizationPeriod(config.getPenaltyDuration()); + procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel())); + procNode.setAnnotationData(config.getAnnotationData()); + procNode.setStyle(processorDTO.getStyle()); + + if (config.getRunDurationMillis() != null) { + procNode.setRunDuration(config.getRunDurationMillis(), TimeUnit.MILLISECONDS); + } + + if (config.getSchedulingStrategy() != null) { + procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy())); + } + + // ensure that the scheduling strategy is set prior to these values + procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount()); + procNode.setScheduldingPeriod(config.getSchedulingPeriod()); + + final Set<Relationship> relationships = new HashSet<>(); + if (processorDTO.getRelationships() != null) { + for (final RelationshipDTO rel : processorDTO.getRelationships()) { + if (rel.isAutoTerminate()) { + relationships.add(procNode.getRelationship(rel.getName())); + } + } + procNode.setAutoTerminatedRelationships(relationships); + } + + if (config.getProperties() != null) { + for (Map.Entry<String, String> entry : config.getProperties().entrySet()) { + if (entry.getValue() != null) { + procNode.setProperty(entry.getKey(), entry.getValue()); + } + } + } + + group.addProcessor(procNode); + } + + // + // Instantiate Remote Process Groups + // + for (final RemoteProcessGroupDTO remoteGroupDTO : dto.getRemoteProcessGroups()) { + final RemoteProcessGroup remoteGroup = createRemoteProcessGroup(remoteGroupDTO.getId(), remoteGroupDTO.getTargetUri()); + remoteGroup.setComments(remoteGroupDTO.getComments()); + remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition())); + remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout()); + remoteGroup.setYieldDuration(remoteGroupDTO.getYieldDuration()); + remoteGroup.setProcessGroup(group); + + // set the input/output ports + if (remoteGroupDTO.getContents() != null) { + final RemoteProcessGroupContentsDTO contents = remoteGroupDTO.getContents(); + + // ensure there input ports + if (contents.getInputPorts() != null) { + remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts())); + } + + // ensure there are output ports + if (contents.getOutputPorts() != null) { + remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts())); + } + } + + group.addRemoteProcessGroup(remoteGroup); + } + + // + // Instantiate ProcessGroups + // + for (final ProcessGroupDTO groupDTO : dto.getProcessGroups()) { + final ProcessGroup childGroup = createProcessGroup(groupDTO.getId()); + childGroup.setParent(group); + childGroup.setPosition(toPosition(groupDTO.getPosition())); + childGroup.setComments(groupDTO.getComments()); + childGroup.setName(groupDTO.getName()); + group.addProcessGroup(childGroup); + + final FlowSnippetDTO contents = groupDTO.getContents(); + + // we want this to be recursive, so we will create a new template that contains only + // the contents of this child group and recursively call ourselves. + final FlowSnippetDTO childTemplateDTO = new FlowSnippetDTO(); + childTemplateDTO.setConnections(contents.getConnections()); + childTemplateDTO.setInputPorts(contents.getInputPorts()); + childTemplateDTO.setLabels(contents.getLabels()); + childTemplateDTO.setOutputPorts(contents.getOutputPorts()); + childTemplateDTO.setProcessGroups(contents.getProcessGroups()); + childTemplateDTO.setProcessors(contents.getProcessors()); + childTemplateDTO.setFunnels(contents.getFunnels()); + childTemplateDTO.setRemoteProcessGroups(contents.getRemoteProcessGroups()); + instantiateSnippet(childGroup, childTemplateDTO); + } + + // + // Instantiate Connections + // + for (final ConnectionDTO connectionDTO : dto.getConnections()) { + final ConnectableDTO sourceDTO = connectionDTO.getSource(); + final ConnectableDTO destinationDTO = connectionDTO.getDestination(); + final Connectable source; + final Connectable destination; + + // locate the source and destination connectable. if this is a remote port + // we need to locate the remote process groups. otherwise we need to + // find the connectable given its parent group. + // NOTE: (getConnectable returns ANY connectable, when the parent is + // not this group only input ports or output ports should be returned. if something + // other than a port is returned, an exception will be thrown when adding the + // connection below.) + // see if the source connectable is a remote port + if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDTO.getType())) { + final RemoteProcessGroup remoteGroup = group.getRemoteProcessGroup(sourceDTO.getGroupId()); + source = remoteGroup.getOutputPort(sourceDTO.getId()); + } else { + final ProcessGroup sourceGroup = getConnectableParent(group, sourceDTO.getGroupId()); + source = sourceGroup.getConnectable(sourceDTO.getId()); + } + + // see if the destination connectable is a remote port + if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationDTO.getType())) { + final RemoteProcessGroup remoteGroup = group.getRemoteProcessGroup(destinationDTO.getGroupId()); + destination = remoteGroup.getInputPort(destinationDTO.getId()); + } else { + final ProcessGroup destinationGroup = getConnectableParent(group, destinationDTO.getGroupId()); + destination = destinationGroup.getConnectable(destinationDTO.getId()); + } + + // determine the selection relationships for this connection + final Set<String> relationships = new HashSet<>(); + if (connectionDTO.getSelectedRelationships() != null) { + relationships.addAll(connectionDTO.getSelectedRelationships()); + } + + final Connection connection = createConnection(connectionDTO.getId(), connectionDTO.getName(), source, destination, relationships); + + if (connectionDTO.getBends() != null) { + final List<Position> bendPoints = new ArrayList<>(); + for (final PositionDTO bend : connectionDTO.getBends()) { + bendPoints.add(new Position(bend.getX(), bend.getY())); + } + connection.setBendPoints(bendPoints); + } + + final FlowFileQueue queue = connection.getFlowFileQueue(); + queue.setBackPressureDataSizeThreshold(connectionDTO.getBackPressureDataSizeThreshold()); + queue.setBackPressureObjectThreshold(connectionDTO.getBackPressureObjectThreshold()); + queue.setFlowFileExpiration(connectionDTO.getFlowFileExpiration()); + + final List<String> prioritizers = connectionDTO.getPrioritizers(); + if (prioritizers != null) { + final List<String> newPrioritizersClasses = new ArrayList<>(prioritizers); + final List<FlowFilePrioritizer> newPrioritizers = new ArrayList<>(); + for (final String className : newPrioritizersClasses) { + try { + newPrioritizers.add(createPrioritizer(className)); + } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new IllegalArgumentException("Unable to set prioritizer " + className + ": " + e); + } + } + queue.setPriorities(newPrioritizers); + } + + connection.setProcessGroup(group); + group.addConnection(connection); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Converts a set of ports into a set of remote process group ports. + * + * @param ports + * @return + */ + private Set<RemoteProcessGroupPortDescriptor> convertRemotePort(final Set<RemoteProcessGroupPortDTO> ports) { + Set<RemoteProcessGroupPortDescriptor> remotePorts = null; + if (ports != null) { + remotePorts = new LinkedHashSet<>(ports.size()); + for (RemoteProcessGroupPortDTO port : ports) { + final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor(); + descriptor.setId(port.getId()); + descriptor.setName(port.getName()); + descriptor.setComments(port.getComments()); + descriptor.setTargetRunning(port.isTargetRunning()); + descriptor.setConnected(port.isConnected()); + descriptor.setConcurrentlySchedulableTaskCount(port.getConcurrentlySchedulableTaskCount()); + descriptor.setTransmitting(port.isTransmitting()); + descriptor.setUseCompression(port.getUseCompression()); + remotePorts.add(descriptor); + } + } + return remotePorts; + } + + /** + * Returns the parent of the specified Connectable. This only considers this + * group and any direct child sub groups. + * + * @param parentGroupId + * @return + */ + private ProcessGroup getConnectableParent(final ProcessGroup group, final String parentGroupId) { + if (areGroupsSame(group.getIdentifier(), parentGroupId)) { + return group; + } else { + return group.getProcessGroup(parentGroupId); + } + } + + /** + * <p> + * Verifies that the given DTO is valid, according to the following: + * + * <ul> + * <li>None of the ID's in any component of the DTO can be used in this + * flow.</li> + * <li>The ProcessGroup to which the template's contents will be added must + * not contain any InputPort or OutputPort with the same name as one of the + * corresponding components in the root level of the template.</li> + * <li>All Processors' classes must exist in this instance.</li> + * <li>All Flow File Prioritizers' classes must exist in this instance.</li> + * </ul> + * </p> + * + * <p> + * If any of the above statements does not hold true, an + * {@link IllegalStateException} or a + * {@link ProcessorInstantiationException} will be thrown. + * </p> + * + * @param group + * @param templateContents + */ + private void validateSnippetContents(final ProcessGroup group, final FlowSnippetDTO templateContents) { + // validate the names of Input Ports + for (final PortDTO port : templateContents.getInputPorts()) { + if (group.getInputPortByName(port.getName()) != null) { + throw new IllegalStateException("ProcessGroup already has an Input Port with name " + port.getName()); + } + } + + // validate the names of Output Ports + for (final PortDTO port : templateContents.getOutputPorts()) { + if (group.getOutputPortByName(port.getName()) != null) { + throw new IllegalStateException("ProcessGroup already has an Output Port with name " + port.getName()); + } + } + + // validate that all Processor Types and Prioritizer Types are valid + final List<String> processorClasses = new ArrayList<>(); + for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) { + processorClasses.add(c.getName()); + } + final List<String> prioritizerClasses = new ArrayList<>(); + for (final Class<?> c : ExtensionManager.getExtensions(FlowFilePrioritizer.class)) { + prioritizerClasses.add(c.getName()); + } + + final Set<ProcessorDTO> allProcs = new HashSet<>(); + final Set<ConnectionDTO> allConns = new HashSet<>(); + allProcs.addAll(templateContents.getProcessors()); + allConns.addAll(templateContents.getConnections()); + for (final ProcessGroupDTO childGroup : templateContents.getProcessGroups()) { + allProcs.addAll(findAllProcessors(childGroup)); + allConns.addAll(findAllConnections(childGroup)); + } + + for (final ProcessorDTO proc : allProcs) { + if (!processorClasses.contains(proc.getType())) { + throw new IllegalStateException("Invalid Processor Type: " + proc.getType()); + } + } + + for (final ConnectionDTO conn : allConns) { + final List<String> prioritizers = conn.getPrioritizers(); + if (prioritizers != null) { + for (final String prioritizer : prioritizers) { + if (!prioritizerClasses.contains(prioritizer)) { + throw new IllegalStateException("Invalid FlowFile Prioritizer Type: " + prioritizer); + } + } + } + } + } + + /** + * Recursively finds all ProcessorDTO's + * + * @param group + * @return + */ + private Set<ProcessorDTO> findAllProcessors(final ProcessGroupDTO group) { + final Set<ProcessorDTO> procs = new HashSet<>(); + for (final ProcessorDTO dto : group.getContents().getProcessors()) { + procs.add(dto); + } + + for (final ProcessGroupDTO childGroup : group.getContents().getProcessGroups()) { + procs.addAll(findAllProcessors(childGroup)); + } + return procs; + } + + /** + * Recursively finds all ConnectionDTO's + * + * @param group + * @return + */ + private Set<ConnectionDTO> findAllConnections(final ProcessGroupDTO group) { + final Set<ConnectionDTO> conns = new HashSet<>(); + for (final ConnectionDTO dto : group.getContents().getConnections()) { + conns.add(dto); + } + + for (final ProcessGroupDTO childGroup : group.getContents().getProcessGroups()) { + conns.addAll(findAllConnections(childGroup)); + } + return conns; + } + + // + // Processor access + // + /** + * Indicates whether or not the two ID's point to the same ProcessGroup. If + * either id is null, will return <code>false</code. + * + * @param id1 + * @param id2 + * @return + */ + public boolean areGroupsSame(final String id1, final String id2) { + if (id1 == null || id2 == null) { + return false; + } else if (id1.equals(id2)) { + return true; + } else { + final String comparable1 = (id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1); + final String comparable2 = (id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2); + return (comparable1.equals(comparable2)); + } + } + + public FlowFilePrioritizer createPrioritizer(final String type) throws InstantiationException, IllegalAccessException, ClassNotFoundException { + FlowFilePrioritizer prioritizer; + + final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); + try { + final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type); + final Class<?> rawClass; + if (detectedClassLoaderForType == null) { + // try to find from the current class loader + rawClass = Class.forName(type); + } else { + // try to find from the registered classloader for that type + rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type)); + } + + Thread.currentThread().setContextClassLoader(detectedClassLoaderForType); + final Class<? extends FlowFilePrioritizer> prioritizerClass = rawClass.asSubclass(FlowFilePrioritizer.class); + final Object processorObj = prioritizerClass.newInstance(); + prioritizer = prioritizerClass.cast(processorObj); + + return prioritizer; + } finally { + if (ctxClassLoader != null) { + Thread.currentThread().setContextClassLoader(ctxClassLoader); + } + } + } + + // + // InputPort access + // + public PortDTO updateInputPort(final String parentGroupId, final PortDTO dto) { + final ProcessGroup parentGroup = lookupGroup(parentGroupId); + final Port port = parentGroup.getInputPort(dto.getId()); + if (port == null) { + throw new IllegalStateException("No Input Port with ID " + dto.getId() + " is known as a child of ProcessGroup with ID " + parentGroupId); + } + + final String name = dto.getName(); + if (dto.getPosition() != null) { + port.setPosition(toPosition(dto.getPosition())); + } + + if (name != null) { + port.setName(name); + } + + return createDTO(port); + } + + private PortDTO createDTO(final Port port) { + if (port == null) { + return null; + } + + final PortDTO dto = new PortDTO(); + dto.setId(port.getIdentifier()); + dto.setPosition(new PositionDTO(port.getPosition().getX(), port.getPosition().getY())); + dto.setName(port.getName()); + dto.setParentGroupId(port.getProcessGroup().getIdentifier()); + + return dto; + } + + // + // OutputPort access + // + public PortDTO updateOutputPort(final String parentGroupId, final PortDTO dto) { + final ProcessGroup parentGroup = lookupGroup(parentGroupId); + final Port port = parentGroup.getOutputPort(dto.getId()); + if (port == null) { + throw new IllegalStateException("No Output Port with ID " + dto.getId() + " is known as a child of ProcessGroup with ID " + parentGroupId); + } + + final String name = dto.getName(); + if (name != null) { + port.setName(name); + } + + if (dto.getPosition() != null) { + port.setPosition(toPosition(dto.getPosition())); + } + + return createDTO(port); + } + + // + // Processor/Prioritizer/Filter Class Access + // + @SuppressWarnings("rawtypes") + public Set<Class> getFlowFileProcessorClasses() { + return ExtensionManager.getExtensions(Processor.class); + } + + @SuppressWarnings("rawtypes") + public Set<Class> getFlowFileComparatorClasses() { + return ExtensionManager.getExtensions(FlowFilePrioritizer.class); + } + + /** + * Returns the ProcessGroup with the given ID + * + * @param id + * @return the process group or null if not group is found + */ + private ProcessGroup lookupGroup(final String id) { + final ProcessGroup group = getGroup(id); + if (group == null) { + throw new IllegalStateException("No Group with ID " + id + " exists"); + } + return group; + } + + /** + * Returns the ProcessGroup with the given ID + * + * @param id + * @return the process group or null if not group is found + */ + public ProcessGroup getGroup(final String id) { + requireNonNull(id); + final ProcessGroup root; + readLock.lock(); + try { + root = rootGroup; + } finally { + readLock.unlock(); + } + + final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id; + return (root == null) ? null : root.findProcessGroup(searchId); + } + + @Override + public ProcessGroupStatus getControllerStatus() { + return getGroupStatus(getRootGroupId()); + } + + public ProcessGroupStatus getGroupStatus(final String groupId) { + return getGroupStatus(groupId, getProcessorStats()); + } + + public ProcessGroupStatus getGroupStatus(final String groupId, final RepositoryStatusReport statusReport) { + final ProcessGroup group = getGroup(groupId); + return getGroupStatus(group, statusReport); + } + + public ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStatusReport statusReport) { + if (group == null) { + return null; + } + + final ProcessGroupStatus status = new ProcessGroupStatus(); + status.setId(group.getIdentifier()); + status.setName(group.getName()); + status.setCreationTimestamp(new Date().getTime()); + int activeGroupThreads = 0; + long bytesRead = 0L; + long bytesWritten = 0L; + int queuedCount = 0; + long queuedContentSize = 0L; + int flowFilesIn = 0; + long bytesIn = 0L; + int flowFilesOut = 0; + long bytesOut = 0L; + int flowFilesReceived = 0; + long bytesReceived = 0L; + int flowFilesSent = 0; + long bytesSent = 0L; + + // set status for processors + final Collection<ProcessorStatus> processorStatusCollection = new ArrayList<>(); + status.setProcessorStatus(processorStatusCollection); +
<TRUNCATED>
