http://git-wip-us.apache.org/repos/asf/nifi/blob/05a99a93/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 8879726..0417693 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -238,7 +238,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; import static java.util.Objects.requireNonNull; - +import static java.util.Objects.requireNonNull; public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider { @@ -324,7 +324,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R */ private final StringEncryptor encryptor; - private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks", true); private final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager(); @@ -376,19 +375,21 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final Authorizer authorizer, final AuditService auditService, final StringEncryptor encryptor, - final BulletinRepository bulletinRepo, VariableRegistry variableRegistry) { + final BulletinRepository bulletinRepo, + final VariableRegistry variableRegistry) { return new FlowController( - flowFileEventRepo, - properties, - authorizer, - auditService, - encryptor, - /* configuredForClustering */ false, - /* NodeProtocolSender */ null, - bulletinRepo, - /* cluster coordinator */ null, - /* heartbeat monitor */ null, variableRegistry); + flowFileEventRepo, + properties, + authorizer, + auditService, + encryptor, + /* configuredForClustering */ false, + /* NodeProtocolSender */ null, + bulletinRepo, + /* cluster coordinator */ null, + /* heartbeat monitor */ null, + /* variable registry */ variableRegistry); } public static FlowController createClusteredInstance( @@ -404,16 +405,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R VariableRegistry variableRegistry) { final FlowController flowController = new FlowController( - flowFileEventRepo, - properties, - authorizer, - auditService, - encryptor, - /* configuredForClustering */ true, - protocolSender, - bulletinRepo, - clusterCoordinator, - heartbeatMonitor, variableRegistry); + flowFileEventRepo, + properties, + authorizer, + auditService, + encryptor, + /* configuredForClustering */ true, + protocolSender, + bulletinRepo, + clusterCoordinator, + heartbeatMonitor, variableRegistry); return flowController; } @@ -429,7 +430,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final BulletinRepository bulletinRepo, final ClusterCoordinator clusterCoordinator, final HeartbeatMonitor heartbeatMonitor, - VariableRegistry variableRegistry) { + final VariableRegistry variableRegistry) { maxTimerDrivenThreads = new AtomicInteger(10); maxEventDrivenThreads = new AtomicInteger(5); @@ -447,11 +448,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, resourceClaimManager); flowFileRepository = flowFileRepo; flowFileEventRepository = flowFileEventRepo; - counterRepositoryRef = new AtomicReference<CounterRepository>(new StandardCounterRepository()); + counterRepositoryRef = new AtomicReference<>(new StandardCounterRepository()); bulletinRepository = bulletinRepo; - this.variableRegistry = variableRegistry; - + this.variableRegistry = variableRegistry == null ? VariableRegistry.EMPTY_REGISTRY : variableRegistry; try { this.provenanceRepository = createProvenanceRepository(properties); @@ -477,7 +477,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository); processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent( - eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor, this.variableRegistry)); + eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor, this.variableRegistry)); final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry); final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry); @@ -595,7 +595,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R registerForClusterCoordinator(); } else { LOG.info("The Elected Cluster Coordinator is {}. Will not register to be elected for this role until after connecting " - + "to the cluster and inheriting the cluster's flow.", electedCoordinatorNodeId); + + "to the cluster and inheriting the cluster's flow.", electedCoordinatorNodeId); } leaderElectionManager.start(); @@ -623,7 +623,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION); if (implementationClassName == null) { throw new RuntimeException("Cannot create FlowFile Repository because the NiFi Properties is missing the following property: " - + NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION); + + NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION); } try { @@ -750,7 +750,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * <p> - * Causes any processors that were added to the flow with a 'delayStart' flag of true to now start + * Causes any processors that were added to the flow with a 'delayStart' + * flag of true to now start * </p> * * @param startDelayedComponents true if start @@ -772,7 +773,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R startConnectable(connectable); } } catch (final Throwable t) { - LOG.error("Unable to start {} due to {}", new Object[] {connectable, t.toString()}); + LOG.error("Unable to start {} due to {}", new Object[]{connectable, t.toString()}); if (LOG.isDebugEnabled()) { LOG.error("", t); } @@ -787,7 +788,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort); startedTransmitting++; } catch (final Throwable t) { - LOG.error("Unable to start transmitting with {} due to {}", new Object[] {remoteGroupPort, t}); + LOG.error("Unable to start transmitting with {} due to {}", new Object[]{remoteGroupPort, t}); } } @@ -802,7 +803,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R startConnectable(connectable); } } catch (final Throwable t) { - LOG.error("Unable to start {} due to {}", new Object[] {connectable, t}); + LOG.error("Unable to start {} due to {}", new Object[]{connectable, t}); } } @@ -818,7 +819,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final String implementationClassName = properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, DEFAULT_CONTENT_REPO_IMPLEMENTATION); if (implementationClassName == null) { throw new RuntimeException("Cannot create Content Repository because the NiFi Properties is missing the following property: " - + NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION); + + NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION); } try { @@ -836,7 +837,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final String implementationClassName = properties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, DEFAULT_PROVENANCE_REPO_IMPLEMENTATION); if (implementationClassName == null) { throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: " - + NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS); + + NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS); } try { @@ -850,7 +851,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION); if (implementationClassName == null) { throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: " - + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION); + + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION); } try { @@ -864,14 +865,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * Creates a connection between two Connectable objects. * * @param id required ID of the connection - * @param name the name of the connection, or <code>null</code> to leave the connection unnamed + * @param name the name of the connection, or <code>null</code> to leave the + * connection unnamed * @param source required source * @param destination required destination * @param relationshipNames required collection of relationship names * @return * - * @throws NullPointerException if the ID, source, destination, or set of relationships is null. - * @throws IllegalArgumentException if <code>relationships</code> is an empty collection + * @throws NullPointerException if the ID, source, destination, or set of + * relationships is null. + * @throws IllegalArgumentException if <code>relationships</code> is an + * empty collection */ public Connection createConnection(final String id, final String name, final Connectable source, final Connectable destination, final Collection<String> relationshipNames) { final StandardConnection.Builder builder = new StandardConnection.Builder(processScheduler); @@ -948,7 +952,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @param name port name * @return new port * @throws NullPointerException if the ID or name is not unique - * @throws IllegalStateException if an Input Port already exists with the same name or id. + * @throws IllegalStateException if an Input Port already exists with the + * same name or id. */ public Port createLocalInputPort(String id, String name) { id = requireNonNull(id).intern(); @@ -964,7 +969,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @param name port name * @return new port * @throws NullPointerException if the ID or name is not unique - * @throws IllegalStateException if an Input Port already exists with the same name or id. + * @throws IllegalStateException if an Input Port already exists with the + * same name or id. */ public Port createLocalOutputPort(String id, String name) { id = requireNonNull(id).intern(); @@ -986,14 +992,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * <p> - * Creates a new ProcessorNode with the given type and identifier and initializes it invoking the methods annotated with {@link OnAdded}. + * Creates a new ProcessorNode with the given type and identifier and + * initializes it invoking the methods annotated with {@link OnAdded}. * </p> * * @param type processor type * @param id processor id * @return new processor * @throws NullPointerException if either arg is null - * @throws ProcessorInstantiationException if the processor cannot be instantiated for any reason + * @throws ProcessorInstantiationException if the processor cannot be + * instantiated for any reason */ public ProcessorNode createProcessor(final String type, final String id) throws ProcessorInstantiationException { return createProcessor(type, id, true); @@ -1001,15 +1009,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * <p> - * Creates a new ProcessorNode with the given type and identifier and optionally initializes it. + * Creates a new ProcessorNode with the given type and identifier and + * optionally initializes it. * </p> * * @param type the fully qualified Processor class name * @param id the unique ID of the Processor - * @param firstTimeAdded whether or not this is the first time this Processor is added to the graph. If {@code true}, will invoke methods annotated with the {@link OnAdded} annotation. + * @param firstTimeAdded whether or not this is the first time this + * Processor is added to the graph. If {@code true}, will invoke methods + * annotated with the {@link OnAdded} annotation. * @return new processor node * @throws NullPointerException if either arg is null - * @throws ProcessorInstantiationException if the processor cannot be instantiated for any reason + * @throws ProcessorInstantiationException if the processor cannot be + * instantiated for any reason */ public ProcessorNode createProcessor(final String type, String id, final boolean firstTimeAdded) throws ProcessorInstantiationException { id = id.intern(); @@ -1093,7 +1105,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * @return the ExtensionManager used for instantiating Processors, Prioritizers, etc. + * @return the ExtensionManager used for instantiating Processors, + * Prioritizers, etc. */ public ExtensionManager getExtensionManager() { return extensionManager; @@ -1132,41 +1145,47 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Creates a Port to use as an Input Port for the root Process Group, which is used for Site-to-Site communications + * Creates a Port to use as an Input Port for the root Process Group, which + * is used for Site-to-Site communications * * @param id port id * @param name port name * @return new port * @throws NullPointerException if the ID or name is not unique - * @throws IllegalStateException if an Input Port already exists with the same name or id. + * @throws IllegalStateException if an Input Port already exists with the + * same name or id. */ public Port createRemoteInputPort(String id, String name) { id = requireNonNull(id).intern(); name = requireNonNull(name).intern(); verifyPortIdDoesNotExist(id); return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT, - authorizer, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure)); + authorizer, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure)); } /** - * Creates a Port to use as an Output Port for the root Process Group, which is used for Site-to-Site communications and will queue flow files waiting to be delivered to remote instances + * Creates a Port to use as an Output Port for the root Process Group, which + * is used for Site-to-Site communications and will queue flow files waiting + * to be delivered to remote instances * * @param id port id * @param name port name * @return new port * @throws NullPointerException if the ID or name is not unique - * @throws IllegalStateException if an Input Port already exists with the same name or id. + * @throws IllegalStateException if an Input Port already exists with the + * same name or id. */ public Port createRemoteOutputPort(String id, String name) { id = requireNonNull(id).intern(); name = requireNonNull(name).intern(); verifyPortIdDoesNotExist(id); return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT, - authorizer, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure)); + authorizer, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure)); } /** - * Creates a new Remote Process Group with the given ID that points to the given URI + * Creates a new Remote Process Group with the given ID that points to the + * given URI * * @param id group id * @param uri group uri @@ -1179,7 +1198,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Verifies that no output port exists with the given id or name. If this does not hold true, throws an IllegalStateException + * Verifies that no output port exists with the given id or name. If this + * does not hold true, throws an IllegalStateException * * @param id port identifier * @throws IllegalStateException port already exists @@ -1196,7 +1216,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * @return the name of this controller, which is also the name of the Root Group. + * @return the name of this controller, which is also the name of the Root + * Group. */ public String getName() { readLock.lock(); @@ -1208,7 +1229,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Sets the name for the Root Group, which also changes the name for the controller. + * Sets the name for the Root Group, which also changes the name for the + * controller. * * @param name of root group */ @@ -1222,7 +1244,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * @return the comments of this controller, which is also the comment of the Root Group + * @return the comments of this controller, which is also the comment of the + * Root Group */ public String getComments() { readLock.lock(); @@ -1236,7 +1259,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * Sets the comments * - * @param comments for the Root Group, which also changes the comment for the controller + * @param comments for the Root Group, which also changes the comment for + * the controller */ public void setComments(final String comments) { readLock.lock(); @@ -1248,7 +1272,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * @return <code>true</code> if the scheduling engine for this controller has been terminated. + * @return <code>true</code> if the scheduling engine for this controller + * has been terminated. */ public boolean isTerminated() { this.readLock.lock(); @@ -1260,12 +1285,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Triggers the controller to begin shutdown, stopping all processors and terminating the scheduling engine. After calling this method, the {@link #isTerminated()} method will indicate whether or - * not the shutdown has finished. + * Triggers the controller to begin shutdown, stopping all processors and + * terminating the scheduling engine. After calling this method, the + * {@link #isTerminated()} method will indicate whether or not the shutdown + * has finished. * - * @param kill if <code>true</code>, attempts to stop all active threads, but makes no guarantee that this will happen + * @param kill if <code>true</code>, attempts to stop all active threads, + * but makes no guarantee that this will happen * - * @throws IllegalStateException if the controller is already stopped or currently in the processor of stopping + * @throws IllegalStateException if the controller is already stopped or + * currently in the processor of stopping */ public void shutdown(final boolean kill) { this.shutdown = true; @@ -1328,14 +1357,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R try { flowFileRepository.close(); } catch (final Throwable t) { - LOG.warn("Unable to shut down FlowFileRepository due to {}", new Object[] {t}); + LOG.warn("Unable to shut down FlowFileRepository due to {}", new Object[]{t}); } if (this.timerDrivenEngineRef.get().isTerminated() && eventDrivenEngineRef.get().isTerminated()) { LOG.info("Controller has been terminated successfully."); } else { LOG.warn("Controller hasn't terminated properly. There exists an uninterruptable thread that " - + "will take an indeterminate amount of time to stop. Might need to kill the program manually."); + + "will take an indeterminate amount of time to stop. Might need to kill the program manually."); } for (final RemoteSiteListener listener : externalSiteListeners) { @@ -1365,13 +1394,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } - /** * Serializes the current state of the controller to the given OutputStream * * @param serializer serializer * @param os stream - * @throws FlowSerializationException if serialization of the flow fails for any reason + * @throws FlowSerializationException if serialization of the flow fails for + * any reason */ public void serialize(final FlowSerializer serializer, final OutputStream os) throws FlowSerializationException { readLock.lock(); @@ -1385,17 +1414,24 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * Synchronizes this controller with the proposed flow. * - * For more details, see {@link FlowSynchronizer#sync(FlowController, DataFlow, StringEncryptor)}. + * For more details, see + * {@link FlowSynchronizer#sync(FlowController, DataFlow, StringEncryptor)}. * * @param synchronizer synchronizer - * @param dataFlow the flow to load the controller with. If the flow is null or zero length, then the controller must not have a flow or else an UninheritableFlowException will be thrown. + * @param dataFlow the flow to load the controller with. If the flow is null + * or zero length, then the controller must not have a flow or else an + * UninheritableFlowException will be thrown. * - * @throws FlowSerializationException if proposed flow is not a valid flow configuration file - * @throws UninheritableFlowException if the proposed flow cannot be loaded by the controller because in doing so would risk orphaning flow files - * @throws FlowSynchronizationException if updates to the controller failed. If this exception is thrown, then the controller should be considered unsafe to be used + * @throws FlowSerializationException if proposed flow is not a valid flow + * configuration file + * @throws UninheritableFlowException if the proposed flow cannot be loaded + * by the controller because in doing so would risk orphaning flow files + * @throws FlowSynchronizationException if updates to the controller failed. + * If this exception is thrown, then the controller should be considered + * unsafe to be used */ public void synchronize(final FlowSynchronizer synchronizer, final DataFlow dataFlow) - throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException { + throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException { writeLock.lock(); try { LOG.debug("Synchronizing controller with proposed flow"); @@ -1407,7 +1443,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * @return the currently configured maximum number of threads that can be used for executing processors at any given time. + * @return the currently configured maximum number of threads that can be + * used for executing processors at any given time. */ public int getMaxTimerDrivenThreadCount() { return maxTimerDrivenThreads.get(); @@ -1437,9 +1474,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Updates the number of threads that can be simultaneously used for executing processors. + * Updates the number of threads that can be simultaneously used for + * executing processors. * - * @param maxThreadCount This method must be called while holding the write lock! + * @param maxThreadCount This method must be called while holding the write + * lock! */ private void setMaxThreadCount(final int maxThreadCount, final FlowEngine engine, final AtomicInteger maxThreads) { if (maxThreadCount < 1) { @@ -1470,7 +1509,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @param group the ProcessGroup that is to become the new Root Group * * @throws IllegalArgumentException if the ProcessGroup has a parent - * @throws IllegalStateException if the FlowController does not know about the given process group + * @throws IllegalStateException if the FlowController does not know about + * the given process group */ void setRootGroup(final ProcessGroup group) { if (requireNonNull(group).getParent() != null) { @@ -1501,13 +1541,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // ProcessGroup access // /** - * Updates the process group corresponding to the specified DTO. Any field in DTO that is <code>null</code> (with the exception of the required ID) will be ignored. + * Updates the process group corresponding to the specified DTO. Any field + * in DTO that is <code>null</code> (with the exception of the required ID) + * will be ignored. * * @param dto group * @throws ProcessorInstantiationException * - * @throws IllegalStateException if no process group can be found with the ID of DTO or with the ID of the DTO's parentGroupId, if the template ID specified is invalid, or if the DTO's Parent - * Group ID changes but the parent group has incoming or outgoing connections + * @throws IllegalStateException if no process group can be found with the + * ID of DTO or with the ID of the DTO's parentGroupId, if the template ID + * specified is invalid, or if the DTO's Parent Group ID changes but the + * parent group has incoming or outgoing connections * * @throws NullPointerException if the DTO or its ID is null */ @@ -1529,7 +1573,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } - private Position toPosition(final PositionDTO dto) { return new Position(dto.getX(), dto.getY()); } @@ -1538,15 +1581,21 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Snippet // /** - * Creates an instance of the given snippet and adds the components to the given group + * Creates an instance of the given snippet and adds the components to the + * given group * * @param group group * @param dto dto * * @throws NullPointerException if either argument is null - * @throws IllegalStateException if the snippet is not valid because a component in the snippet has an ID that is not unique to this flow, or because it shares an Input Port or Output Port at the - * root level whose name already exists in the given ProcessGroup, or because the Template contains a Processor or a Prioritizer whose class is not valid within this instance of NiFi. - * @throws ProcessorInstantiationException if unable to instantiate a processor + * @throws IllegalStateException if the snippet is not valid because a + * component in the snippet has an ID that is not unique to this flow, or + * because it shares an Input Port or Output Port at the root level whose + * name already exists in the given ProcessGroup, or because the Template + * contains a Processor or a Prioritizer whose class is not valid within + * this instance of NiFi. + * @throws ProcessorInstantiationException if unable to instantiate a + * processor */ public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException { writeLock.lock(); @@ -1866,7 +1915,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Returns the parent of the specified Connectable. This only considers this group and any direct child sub groups. + * Returns the parent of the specified Connectable. This only considers this + * group and any direct child sub groups. * * @param parentGroupId group id * @return parent group @@ -1884,16 +1934,20 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * Verifies that the given DTO is valid, according to the following: * * <ul> - * <li>None of the ID's in any component of the DTO can be used in this flow.</li> - * <li>The ProcessGroup to which the template's contents will be added must not contain any InputPort or OutputPort with the same name as one of the corresponding components in the root level of - * the template.</li> + * <li>None of the ID's in any component of the DTO can be used in this + * flow.</li> + * <li>The ProcessGroup to which the template's contents will be added must + * not contain any InputPort or OutputPort with the same name as one of the + * corresponding components in the root level of the template.</li> * <li>All Processors' classes must exist in this instance.</li> * <li>All Flow File Prioritizers' classes must exist in this instance.</li> * </ul> * </p> * * <p> - * If any of the above statements does not hold true, an {@link IllegalStateException} or a {@link ProcessorInstantiationException} will be thrown. + * If any of the above statements does not hold true, an + * {@link IllegalStateException} or a + * {@link ProcessorInstantiationException} will be thrown. * </p> * * @param group group @@ -2004,7 +2058,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Processor access // /** - * Indicates whether or not the two ID's point to the same ProcessGroup. If either id is null, will return <code>false</code>. + * Indicates whether or not the two ID's point to the same ProcessGroup. If + * either id is null, will return <code>false</code>. * * @param id1 group id * @param id2 other group id @@ -2156,7 +2211,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Returns the status of all components in the controller. This request is not in the context of a user so the results will be unfiltered. + * Returns the status of all components in the controller. This request is + * not in the context of a user so the results will be unfiltered. * * @return the component status */ @@ -2166,7 +2222,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Returns the status of all components in the specified group. This request is not in the context of a user so the results will be unfiltered. + * Returns the status of all components in the specified group. This request + * is not in the context of a user so the results will be unfiltered. * * @param groupId group id * @return the component status @@ -2176,7 +2233,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Returns the status for components in the specified group. This request is made by the specified user so the results will be filtered accordingly. + * Returns the status for components in the specified group. This request is + * made by the specified user so the results will be filtered accordingly. * * @param groupId group id * @param user user making request @@ -2187,8 +2245,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Returns the status for the components in the specified group with the specified report. This request is not in the context of a user so the results - * will be unfiltered. + * Returns the status for the components in the specified group with the + * specified report. This request is not in the context of a user so the + * results will be unfiltered. * * @param groupId group id * @param statusReport report @@ -2202,8 +2261,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Returns the status for the components in the specified group with the specified report. This request is made by the specified user - * so the results will be filtered accordingly. + * Returns the status for the components in the specified group with the + * specified report. This request is made by the specified user so the + * results will be filtered accordingly. * * @param groupId group id * @param statusReport report @@ -2218,8 +2278,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Returns the status for the components in the specified group with the specified report. The results will be filtered by executing - * the specified predicate. + * Returns the status for the components in the specified group with the + * specified report. The results will be filtered by executing the specified + * predicate. * * @param group group id * @param statusReport report @@ -2795,7 +2856,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } - final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider,variableRegistry); + final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, variableRegistry); final ReportingTaskNode taskNode; if (creationSuccessful) { taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, variableRegistry); @@ -2803,7 +2864,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; final String componentType = "(Missing) " + simpleClassName; - taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type,variableRegistry); + taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, variableRegistry); } taskNode.setName(task.getClass().getSimpleName()); @@ -2811,7 +2872,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R if (firstTimeAdded) { final ComponentLog componentLog = new SimpleProcessLogger(id, taskNode.getReportingTask()); final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(), - SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this); + SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this); try { task.initialize(config); @@ -2832,7 +2893,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Register log observer to provide bulletins when reporting task logs anything at WARN level or above final LogRepository logRepository = LogRepositoryFactory.getRepository(id); logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, - new ReportingTaskLogObserver(getBulletinRepository(), taskNode)); + new ReportingTaskLogObserver(getBulletinRepository(), taskNode)); return taskNode; } @@ -2903,7 +2964,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Register log observer to provide bulletins when reporting task logs anything at WARN level or above final LogRepository logRepository = LogRepositoryFactory.getRepository(id); logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, - new ControllerServiceLogObserver(getBulletinRepository(), serviceNode)); + new ControllerServiceLogObserver(getBulletinRepository(), serviceNode)); if (firstTimeAdded) { final ControllerService service = serviceNode.getControllerServiceImplementation(); @@ -3028,7 +3089,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R service.verifyCanDelete(); try (final NarCloseable x = NarCloseable.withNarLoader()) { - final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null,variableRegistry); + final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null, variableRegistry); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, service.getControllerServiceImplementation(), configurationContext); } @@ -3139,7 +3200,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Clustering methods // /** - * Starts heartbeating to the cluster. May only be called if the instance was constructed for a clustered environment. + * Starts heartbeating to the cluster. May only be called if the instance + * was constructed for a clustered environment. * * @throws IllegalStateException if not configured for clustering */ @@ -3161,23 +3223,31 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Notifies controller that the sending of heartbeats should be temporarily suspended. This method does not cancel any background tasks as does {@link #stopHeartbeating()} and does not require any - * lock on the FlowController. Background tasks will still generate heartbeat messages and any background task currently in the process of sending a Heartbeat to the cluster will continue. + * Notifies controller that the sending of heartbeats should be temporarily + * suspended. This method does not cancel any background tasks as does + * {@link #stopHeartbeating()} and does not require any lock on the + * FlowController. Background tasks will still generate heartbeat messages + * and any background task currently in the process of sending a Heartbeat + * to the cluster will continue. */ public void suspendHeartbeats() { heartbeatsSuspended.set(true); } /** - * Notifies controller that the sending of heartbeats should be re-enabled. This method does not submit any background tasks to take affect as does {@link #startHeartbeating()} and does not - * require any lock on the FlowController. + * Notifies controller that the sending of heartbeats should be re-enabled. + * This method does not submit any background tasks to take affect as does + * {@link #startHeartbeating()} and does not require any lock on the + * FlowController. */ public void resumeHeartbeats() { heartbeatsSuspended.set(false); } /** - * Stops heartbeating to the cluster. May only be called if the instance was constructed for a clustered environment. If the controller was not heartbeating, then this method has no effect. + * Stops heartbeating to the cluster. May only be called if the instance was + * constructed for a clustered environment. If the controller was not + * heartbeating, then this method has no effect. * * @throws IllegalStateException if not clustered */ @@ -3244,7 +3314,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * @return true if this instance is clustered; false otherwise. Clustered means that a node is either connected or trying to connect to the cluster. + * @return true if this instance is clustered; false otherwise. Clustered + * means that a node is either connected or trying to connect to the + * cluster. */ @Override public boolean isClustered() { @@ -3261,8 +3333,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * @return the DN of the Cluster Manager that we are currently connected to, if available. This will return null if the instance is not clustered or if the instance is clustered but the NCM's DN - * is not available - for instance, if cluster communications are not secure + * @return the DN of the Cluster Manager that we are currently connected to, + * if available. This will return null if the instance is not clustered or + * if the instance is clustered but the NCM's DN is not available - for + * instance, if cluster communications are not secure */ public String getClusterManagerDN() { readLock.lock(); @@ -3274,10 +3348,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Sets whether this instance is clustered. Clustered means that a node is either connected or trying to connect to the cluster. + * Sets whether this instance is clustered. Clustered means that a node is + * either connected or trying to connect to the cluster. * * @param clustered true if clustered - * @param clusterInstanceId if clustered is true, indicates the InstanceID of the Cluster Manager + * @param clusterInstanceId if clustered is true, indicates the InstanceID + * of the Cluster Manager */ public void setClustered(final boolean clustered, final String clusterInstanceId) { setClustered(clustered, clusterInstanceId, null); @@ -3320,10 +3396,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Sets whether this instance is clustered. Clustered means that a node is either connected or trying to connect to the cluster. + * Sets whether this instance is clustered. Clustered means that a node is + * either connected or trying to connect to the cluster. * * @param clustered true if clustered - * @param clusterInstanceId if clustered is true, indicates the InstanceID of the Cluster Manager + * @param clusterInstanceId if clustered is true, indicates the InstanceID + * of the Cluster Manager * @param clusterManagerDn the DN of the NCM */ public void setClustered(final boolean clustered, final String clusterInstanceId, final String clusterManagerDn) { @@ -3387,7 +3465,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * @return true if this instance is the primary node in the cluster; false otherwise + * @return true if this instance is the primary node in the cluster; false + * otherwise */ @Override public boolean isPrimary() { @@ -3469,17 +3548,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @Override public boolean isContentSame() { return areEqual(event.getPreviousContentClaimContainer(), event.getContentClaimContainer()) - && areEqual(event.getPreviousContentClaimSection(), event.getContentClaimSection()) - && areEqual(event.getPreviousContentClaimIdentifier(), event.getContentClaimIdentifier()) - && areEqual(event.getPreviousContentClaimOffset(), event.getContentClaimOffset()) - && areEqual(event.getPreviousFileSize(), event.getFileSize()); + && areEqual(event.getPreviousContentClaimSection(), event.getContentClaimSection()) + && areEqual(event.getPreviousContentClaimIdentifier(), event.getContentClaimIdentifier()) + && areEqual(event.getPreviousContentClaimOffset(), event.getContentClaimOffset()) + && areEqual(event.getPreviousFileSize(), event.getFileSize()); } @Override public boolean isInputAvailable() { try { return contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(), - event.getPreviousContentClaimIdentifier(), event.getPreviousContentClaimOffset())); + event.getPreviousContentClaimIdentifier(), event.getPreviousContentClaimOffset())); } catch (final IOException e) { return false; } @@ -3489,7 +3568,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public boolean isOutputAvailable() { try { return contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), event.getContentClaimSection(), - event.getContentClaimIdentifier(), event.getContentClaimOffset())); + event.getContentClaimIdentifier(), event.getContentClaimOffset())); } catch (final IOException e) { return false; } @@ -3526,7 +3605,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(), - provEvent.getPreviousContentClaimIdentifier(), false); + provEvent.getPreviousContentClaimIdentifier(), false); claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset()); offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset(); size = provEvent.getPreviousFileSize(); @@ -3536,7 +3615,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(), - provEvent.getContentClaimIdentifier(), false); + provEvent.getContentClaimIdentifier(), false); claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset()); offset = provEvent.getContentClaimOffset() == null ? 0L : provEvent.getContentClaimOffset(); @@ -3548,18 +3627,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Register a Provenance Event to indicate that we replayed the data. final ProvenanceEventRecord sendEvent = new StandardProvenanceEventRecord.Builder() - .setEventType(ProvenanceEventType.DOWNLOAD) - .setFlowFileUUID(provEvent.getFlowFileUuid()) - .setAttributes(provEvent.getAttributes(), Collections.<String, String> emptyMap()) - .setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), offset, size) - .setTransitUri(requestUri) - .setEventTime(System.currentTimeMillis()) - .setFlowFileEntryDate(provEvent.getFlowFileEntryDate()) - .setLineageStartDate(provEvent.getLineageStartDate()) - .setComponentType(getName()) - .setComponentId(getRootGroupId()) - .setDetails("Download of " + (direction == ContentDirection.INPUT ? "Input" : "Output") + " Content requested by " + requestor + " for Provenance Event " + provEvent.getEventId()) - .build(); + .setEventType(ProvenanceEventType.DOWNLOAD) + .setFlowFileUUID(provEvent.getFlowFileUuid()) + .setAttributes(provEvent.getAttributes(), Collections.<String, String>emptyMap()) + .setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), offset, size) + .setTransitUri(requestUri) + .setEventTime(System.currentTimeMillis()) + .setFlowFileEntryDate(provEvent.getFlowFileEntryDate()) + .setLineageStartDate(provEvent.getLineageStartDate()) + .setComponentType(getName()) + .setComponentId(getRootGroupId()) + .setDetails("Download of " + (direction == ContentDirection.INPUT ? "Input" : "Output") + " Content requested by " + requestor + " for Provenance Event " + provEvent.getEventId()) + .build(); provenanceRepository.registerEvent(sendEvent); @@ -3590,20 +3669,20 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Register a Provenance Event to indicate that we replayed the data. final StandardProvenanceEventRecord.Builder sendEventBuilder = new StandardProvenanceEventRecord.Builder() - .setEventType(ProvenanceEventType.DOWNLOAD) - .setFlowFileUUID(flowFile.getAttribute(CoreAttributes.UUID.key())) - .setAttributes(flowFile.getAttributes(), Collections.<String, String> emptyMap()) - .setTransitUri(requestUri) - .setEventTime(System.currentTimeMillis()) - .setFlowFileEntryDate(flowFile.getEntryDate()) - .setLineageStartDate(flowFile.getLineageStartDate()) - .setComponentType(getName()) - .setComponentId(getRootGroupId()) - .setDetails("Download of Content requested by " + requestor + " for " + flowFile); + .setEventType(ProvenanceEventType.DOWNLOAD) + .setFlowFileUUID(flowFile.getAttribute(CoreAttributes.UUID.key())) + .setAttributes(flowFile.getAttributes(), Collections.<String, String>emptyMap()) + .setTransitUri(requestUri) + .setEventTime(System.currentTimeMillis()) + .setFlowFileEntryDate(flowFile.getEntryDate()) + .setLineageStartDate(flowFile.getLineageStartDate()) + .setComponentType(getName()) + .setComponentId(getRootGroupId()) + .setDetails("Download of Content requested by " + requestor + " for " + flowFile); if (contentClaim != null) { sendEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), - contentClaim.getOffset() + flowFile.getContentClaimOffset(), flowFile.getSize()); + contentClaim.getOffset() + flowFile.getContentClaimOffset(), flowFile.getSize()); } final ProvenanceEventRecord sendEvent = sendEventBuilder.build(); @@ -3710,7 +3789,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Create the ContentClaim final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(), - event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false); + event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false); // Increment Claimant Count, since we will now be referencing the Content Claim resourceClaimManager.incrementClaimantCount(resourceClaim); @@ -3737,22 +3816,22 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // To avoid this, we just always set the offset in the Content Claim itself and set the // FlowFileRecord's contentClaimOffset to 0. final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - // Copy relevant info from source FlowFile - .addAttributes(event.getPreviousAttributes()) - .contentClaim(contentClaim) - .contentClaimOffset(0L) // use 0 because we used the content claim offset in the Content Claim itself - .entryDate(System.currentTimeMillis()) - .id(flowFileRepository.getNextFlowFileSequence()) - .lineageStart(event.getLineageStartDate(), 0L) - .size(contentSize.longValue()) - // Create a new UUID and add attributes indicating that this is a replay - .addAttribute("flowfile.replay", "true") - .addAttribute("flowfile.replay.timestamp", String.valueOf(new Date())) - .addAttribute(CoreAttributes.UUID.key(), newFlowFileUUID) - // remove attributes that may have existed on the source FlowFile that we don't want to exist on the new FlowFile - .removeAttributes(CoreAttributes.DISCARD_REASON.key(), CoreAttributes.ALTERNATE_IDENTIFIER.key()) - // build the record - .build(); + // Copy relevant info from source FlowFile + .addAttributes(event.getPreviousAttributes()) + .contentClaim(contentClaim) + .contentClaimOffset(0L) // use 0 because we used the content claim offset in the Content Claim itself + .entryDate(System.currentTimeMillis()) + .id(flowFileRepository.getNextFlowFileSequence()) + .lineageStart(event.getLineageStartDate(), 0L) + .size(contentSize.longValue()) + // Create a new UUID and add attributes indicating that this is a replay + .addAttribute("flowfile.replay", "true") + .addAttribute("flowfile.replay.timestamp", String.valueOf(new Date())) + .addAttribute(CoreAttributes.UUID.key(), newFlowFileUUID) + // remove attributes that may have existed on the source FlowFile that we don't want to exist on the new FlowFile + .removeAttributes(CoreAttributes.DISCARD_REASON.key(), CoreAttributes.ALTERNATE_IDENTIFIER.key()) + // build the record + .build(); // Register a Provenance Event to indicate that we replayed the data. final ProvenanceEventRecord replayEvent = new StandardProvenanceEventRecord.Builder() @@ -3774,7 +3853,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Update the FlowFile Repository to indicate that we have added the FlowFile to the flow final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord); record.setDestination(queue); - flowFileRepository.updateRepository(Collections.<RepositoryRecord> singleton(record)); + flowFileRepository.updateRepository(Collections.<RepositoryRecord>singleton(record)); // Enqueue the data queue.put(flowFileRecord); @@ -3817,8 +3896,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } - private class HeartbeatSendTask implements Runnable { + private final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS", Locale.US); @Override @@ -3848,10 +3927,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } heartbeatLogger.info("Heartbeat created at {} and sent to {} at {}; send took {} millis", - dateFormatter.format(new Date(message.getHeartbeat().getCreatedTimestamp())), - heartbeatAddress, - dateFormatter.format(new Date()), - sendMillis); + dateFormatter.format(new Date(message.getHeartbeat().getCreatedTimestamp())), + heartbeatAddress, + dateFormatter.format(new Date()), + sendMillis); } catch (final UnknownServiceAddressException usae) { if (heartbeatLogger.isDebugEnabled()) { heartbeatLogger.debug(usae.getMessage()); @@ -4028,8 +4107,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return queues; } - private static class HeartbeatBean { + private final ProcessGroup rootGroup; private final boolean primary;
http://git-wip-us.apache.org/repos/asf/nifi/blob/05a99a93/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FileBasedVariableRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FileBasedVariableRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FileBasedVariableRegistry.java new file mode 100644 index 0000000..c4079a7 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FileBasedVariableRegistry.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.nifi.registry.VariableDescriptor; +import org.apache.nifi.registry.VariableRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A file based variable registry that loads all properties from files specified + * during construction and is backed by system properties and environment + * variables accessible to the JVM. + */ +public class FileBasedVariableRegistry implements VariableRegistry { + + private final static Logger LOG = LoggerFactory.getLogger(FileBasedVariableRegistry.class); + final Map<VariableDescriptor, String> map; + + public FileBasedVariableRegistry(final Path[] propertiesPaths) { + final Map<VariableDescriptor, String> newMap = new HashMap<>(VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY.getVariableMap()); + final int systemEnvPropCount = newMap.size(); + int totalPropertiesLoaded = systemEnvPropCount; + LOG.info("Loaded {} properties from system properties and environment variables",systemEnvPropCount); + try { + for (final Path path : propertiesPaths) { + if (Files.exists(path)) { + final AtomicInteger propsLoaded = new AtomicInteger(0); + try (final InputStream inStream = new BufferedInputStream(new FileInputStream(path.toFile()))) { + Properties properties = new Properties(); + properties.load(inStream); + properties.entrySet().stream().forEach((entry) -> { + final VariableDescriptor desc = new VariableDescriptor.Builder(entry.getKey().toString()) + .description(path.toString()) + .sensitive(false) + .build(); + newMap.put(desc, entry.getValue().toString()); + propsLoaded.incrementAndGet(); + }); + } + totalPropertiesLoaded += propsLoaded.get(); + if(propsLoaded.get() > 0){ + LOG.info("Loaded {} properties from '{}'", propsLoaded.get(), path); + }else{ + LOG.warn("No properties loaded from '{}'", path); + } + } else { + LOG.warn("Skipping property file {} as it does not appear to exist", path); + } + } + } catch (final IOException ioe) { + LOG.error("Unable to complete variable registry loading from files due to ", ioe); + } + LOG.info("Loaded a total of {} properties. Including precedence overrides effective accessible registry key size is {}", totalPropertiesLoaded, newMap.size()); + map = newMap; + } + + @Override + public Map<VariableDescriptor, String> getVariableMap() { + return Collections.unmodifiableMap(map); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/05a99a93/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml index 59c1b80..f03e26c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml @@ -28,7 +28,7 @@ <bean id="nifiProperties" class="org.apache.nifi.util.NiFiProperties" factory-method="getInstance"/> <!-- variable registry --> - <bean id="variableRegistry" class="org.apache.nifi.registry.VariableRegistryUtils" factory-method="createCustomVariableRegistry"> + <bean id="variableRegistry" class="org.apache.nifi.util.FileBasedVariableRegistry"> <constructor-arg type="java.nio.file.Path[]" value="#{nifiProperties.getVariableRegistryPropertiesPaths()}" /> </bean> http://git-wip-us.apache.org/repos/asf/nifi/blob/05a99a93/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java index c864902..986fba4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java @@ -27,7 +27,6 @@ import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.events.VolatileBulletinRepository; import org.apache.nifi.registry.VariableRegistry; -import org.apache.nifi.registry.VariableRegistryUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; @@ -48,6 +47,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.nifi.util.FileBasedVariableRegistry; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -75,7 +75,7 @@ public class StandardFlowServiceTest { @Before public void setup() throws Exception { properties = NiFiProperties.getInstance(); - variableRegistry = VariableRegistryUtils.createCustomVariableRegistry(properties.getVariableRegistryPropertiesPaths()); + variableRegistry = new FileBasedVariableRegistry(properties.getVariableRegistryPropertiesPaths()); mockFlowFileEventRepository = mock(FlowFileEventRepository.class); authorizer = mock(Authorizer.class); mockAuditService = mock(AuditService.class); http://git-wip-us.apache.org/repos/asf/nifi/blob/05a99a93/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java index e9c2c00..60530c8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java @@ -34,7 +34,6 @@ import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.MockProvenanceRepository; import org.apache.nifi.registry.VariableRegistry; -import org.apache.nifi.registry.VariableRegistryUtils; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.util.NiFiProperties; import org.junit.After; @@ -45,6 +44,7 @@ import org.mockito.Mockito; import java.nio.charset.StandardCharsets; import java.util.LinkedHashSet; import java.util.Set; +import org.apache.nifi.util.FileBasedVariableRegistry; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -114,7 +114,7 @@ public class TestFlowController { policies1.add(policy2); authorizer = new MockPolicyBasedAuthorizer(groups1, users1, policies1); - variableRegistry = VariableRegistryUtils.createCustomVariableRegistry(properties.getVariableRegistryPropertiesPaths()); + variableRegistry = new FileBasedVariableRegistry(properties.getVariableRegistryPropertiesPaths()); bulletinRepo = Mockito.mock(BulletinRepository.class); controller = FlowController.createStandaloneInstance(flowFileEventRepo, properties, authorizer, auditService, encryptor, bulletinRepo,variableRegistry); http://git-wip-us.apache.org/repos/asf/nifi/blob/05a99a93/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index a7137a1..602efe7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -43,7 +43,6 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.provenance.MockProvenanceRepository; -import org.apache.nifi.registry.VariableRegistryUtils; import org.apache.nifi.util.NiFiProperties; import org.junit.After; import org.junit.Before; @@ -66,6 +65,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; +import org.apache.nifi.util.FileBasedVariableRegistry; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -663,7 +663,7 @@ public class TestProcessorLifecycle { return FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), properties, mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository(), - VariableRegistryUtils.createCustomVariableRegistry(properties.getVariableRegistryPropertiesPaths())); + new FileBasedVariableRegistry(properties.getVariableRegistryPropertiesPaths())); } /** http://git-wip-us.apache.org/repos/asf/nifi/blob/05a99a93/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index 3c3b5fd..e50cc27 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -62,7 +62,6 @@ import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.StandardValidationContextFactory; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.registry.VariableRegistry; -import org.apache.nifi.registry.VariableRegistryUtils; import org.apache.nifi.reporting.AbstractReportingTask; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingContext; @@ -78,7 +77,7 @@ public class TestStandardProcessScheduler { private ReportingTaskNode taskNode = null; private TestReportingTask reportingTask = null; private final StateManagerProvider stateMgrProvider = Mockito.mock(StateManagerProvider.class); - private VariableRegistry variableRegistry = VariableRegistryUtils.createSystemVariableRegistry(); + private VariableRegistry variableRegistry = VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY; private FlowController controller; private ProcessGroup rootGroup; http://git-wip-us.apache.org/repos/asf/nifi/blob/05a99a93/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java index 9b551ce..77a1d8d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java @@ -23,8 +23,8 @@ import org.apache.nifi.controller.StandardFlowServiceTest; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarClassLoaders; import org.apache.nifi.registry.VariableRegistry; -import org.apache.nifi.registry.VariableRegistryUtils; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.FileBasedVariableRegistry; import org.apache.nifi.util.NiFiProperties; import org.junit.Before; import org.junit.BeforeClass; @@ -43,7 +43,7 @@ public class StandardControllerServiceProviderTest { NiFiProperties properties = NiFiProperties.getInstance(); NarClassLoaders.getInstance().init(properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory()); ExtensionManager.discoverExtensions(NarClassLoaders.getInstance().getExtensionClassLoaders()); - variableRegistry = VariableRegistryUtils.createCustomVariableRegistry(properties.getVariableRegistryPropertiesPaths()); + variableRegistry = new FileBasedVariableRegistry(properties.getVariableRegistryPropertiesPaths()); } @Before http://git-wip-us.apache.org/repos/asf/nifi/blob/05a99a93/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index f3ef85d..c35fd59 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -47,7 +47,6 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.StandardProcessGroup; import org.apache.nifi.processor.StandardValidationContextFactory; import org.apache.nifi.registry.VariableRegistry; -import org.apache.nifi.registry.VariableRegistryUtils; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -77,7 +76,7 @@ public class TestStandardControllerServiceProvider { } }; - private static VariableRegistry variableRegistry = VariableRegistryUtils.createSystemVariableRegistry(); + private static VariableRegistry variableRegistry = VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY; @BeforeClass public static void setNiFiProps() { http://git-wip-us.apache.org/repos/asf/nifi/blob/05a99a93/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java index 19a8405..3a6310f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java @@ -32,8 +32,6 @@ import org.apache.nifi.components.state.StateProvider; import org.apache.nifi.components.state.StateProviderInitializationContext; import org.apache.nifi.controller.state.StateMapUpdate; import org.apache.nifi.controller.state.providers.AbstractTestStateProvider; -import org.apache.nifi.registry.VariableRegistry; -import org.apache.nifi.registry.VariableRegistryUtils; import org.junit.After; import org.junit.Before; import org.wali.WriteAheadRepository; @@ -45,9 +43,8 @@ public class TestWriteAheadLocalStateProvider extends AbstractTestStateProvider @Before public void setup() throws IOException { provider = new WriteAheadLocalStateProvider(); - final VariableRegistry variableRegistry = VariableRegistryUtils.createSystemVariableRegistry(); final Map<PropertyDescriptor, PropertyValue> properties = new HashMap<>(); - properties.put(WriteAheadLocalStateProvider.PATH, new StandardPropertyValue("target/local-state-provider/" + UUID.randomUUID().toString(), null, variableRegistry)); + properties.put(WriteAheadLocalStateProvider.PATH, new StandardPropertyValue("target/local-state-provider/" + UUID.randomUUID().toString(), null)); provider.initialize(new StateProviderInitializationContext() { @Override @@ -64,7 +61,7 @@ public class TestWriteAheadLocalStateProvider extends AbstractTestStateProvider public PropertyValue getProperty(final PropertyDescriptor property) { final PropertyValue prop = properties.get(property); if (prop == null) { - return new StandardPropertyValue(null, null, variableRegistry); + return new StandardPropertyValue(null, null); } return prop; } http://git-wip-us.apache.org/repos/asf/nifi/blob/05a99a93/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java index c0ef068..8ce97fa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java @@ -31,8 +31,6 @@ import org.apache.nifi.components.state.StateProvider; import org.apache.nifi.components.state.StateProviderInitializationContext; import org.apache.nifi.components.state.exception.StateTooLargeException; import org.apache.nifi.controller.state.providers.AbstractTestStateProvider; -import org.apache.nifi.registry.VariableRegistry; -import org.apache.nifi.registry.VariableRegistryUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -42,7 +40,6 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider { private StateProvider provider; private TestingServer zkServer; - private VariableRegistry variableRegistry; private static final Map<PropertyDescriptor, String> defaultProperties = new HashMap<>(); @@ -61,7 +58,6 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider { final Map<PropertyDescriptor, String> properties = new HashMap<>(defaultProperties); properties.put(ZooKeeperStateProvider.CONNECTION_STRING, zkServer.getConnectString()); this.provider = createProvider(properties); - variableRegistry = VariableRegistryUtils.createSystemVariableRegistry(); } private void initializeProvider(final ZooKeeperStateProvider provider, final Map<PropertyDescriptor, String> properties) throws IOException { @@ -75,7 +71,7 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider { public Map<PropertyDescriptor, PropertyValue> getProperties() { final Map<PropertyDescriptor, PropertyValue> propValueMap = new HashMap<>(); for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) { - propValueMap.put(entry.getKey(), new StandardPropertyValue(entry.getValue(), null, variableRegistry)); + propValueMap.put(entry.getKey(), new StandardPropertyValue(entry.getValue(), null)); } return propValueMap; } @@ -83,7 +79,7 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider { @Override public PropertyValue getProperty(final PropertyDescriptor property) { final String prop = properties.get(property); - return new StandardPropertyValue(prop, null, variableRegistry); + return new StandardPropertyValue(prop, null); } @Override
