http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 9ca8e30..1b14cf8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -158,6 +158,7 @@ import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceRepository; import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.remote.HttpRemoteSiteListener; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RemoteResourceManager; @@ -286,6 +287,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final StateManagerProvider stateManagerProvider; private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>(); + private final VariableRegistry variableRegistry; private final ConcurrentMap<String, ControllerServiceNode> rootControllerServices = new ConcurrentHashMap<>(); private volatile ZooKeeperStateServer zooKeeperStateServer; @@ -368,12 +370,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private static final Logger heartbeatLogger = LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat"); public static FlowController createStandaloneInstance( - final FlowFileEventRepository flowFileEventRepo, - final NiFiProperties properties, - final Authorizer authorizer, - final AuditService auditService, - final StringEncryptor encryptor, - final BulletinRepository bulletinRepo) { + final FlowFileEventRepository flowFileEventRepo, + final NiFiProperties properties, + final Authorizer authorizer, + final AuditService auditService, + final StringEncryptor encryptor, + final BulletinRepository bulletinRepo, VariableRegistry variableRegistry) { + return new FlowController( flowFileEventRepo, properties, @@ -384,19 +387,21 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /* NodeProtocolSender */ null, bulletinRepo, /* cluster coordinator */ null, - /* heartbeat monitor */ null); + /* heartbeat monitor */ null, variableRegistry); } public static FlowController createClusteredInstance( - final FlowFileEventRepository flowFileEventRepo, - final NiFiProperties properties, - final Authorizer authorizer, - final AuditService auditService, - final StringEncryptor encryptor, - final NodeProtocolSender protocolSender, - final BulletinRepository bulletinRepo, - final ClusterCoordinator clusterCoordinator, - final HeartbeatMonitor heartbeatMonitor) { + final FlowFileEventRepository flowFileEventRepo, + final NiFiProperties properties, + final Authorizer authorizer, + final AuditService auditService, + final StringEncryptor encryptor, + final NodeProtocolSender protocolSender, + final BulletinRepository bulletinRepo, + final ClusterCoordinator clusterCoordinator, + final HeartbeatMonitor heartbeatMonitor, + VariableRegistry variableRegistry) { + final FlowController flowController = new FlowController( flowFileEventRepo, properties, @@ -407,22 +412,23 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R protocolSender, bulletinRepo, clusterCoordinator, - heartbeatMonitor); + heartbeatMonitor, variableRegistry); return flowController; } private FlowController( - final FlowFileEventRepository flowFileEventRepo, - final NiFiProperties properties, - final Authorizer authorizer, - final AuditService auditService, - final StringEncryptor encryptor, - final boolean configuredForClustering, - final NodeProtocolSender protocolSender, - final BulletinRepository bulletinRepo, - final ClusterCoordinator clusterCoordinator, - final HeartbeatMonitor heartbeatMonitor) { + final FlowFileEventRepository flowFileEventRepo, + final NiFiProperties properties, + final Authorizer authorizer, + final AuditService auditService, + final StringEncryptor encryptor, + final boolean configuredForClustering, + final NodeProtocolSender protocolSender, + final BulletinRepository bulletinRepo, + final ClusterCoordinator clusterCoordinator, + final HeartbeatMonitor heartbeatMonitor, + VariableRegistry variableRegistry) { maxTimerDrivenThreads = new AtomicInteger(10); maxEventDrivenThreads = new AtomicInteger(5); @@ -443,6 +449,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R counterRepositoryRef = new AtomicReference<CounterRepository>(new StandardCounterRepository()); bulletinRepository = bulletinRepo; + this.variableRegistry = variableRegistry; + try { this.provenanceRepository = createProvenanceRepository(properties); @@ -458,20 +466,20 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } try { - this.stateManagerProvider = StandardStateManagerProvider.create(properties); + this.stateManagerProvider = StandardStateManagerProvider.create(properties, this.variableRegistry); } catch (final IOException e) { throw new RuntimeException(e); } - processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider); + processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider, this.variableRegistry); eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler); final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository); processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent( - eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor)); + eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor, this.variableRegistry)); - final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor); - final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor); + final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry); + final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry); processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent); processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent); processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, quartzSchedulingAgent); @@ -507,11 +515,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R this.snippetManager = new SnippetManager(); - rootGroup = new StandardProcessGroup(UUID.randomUUID().toString(), this, processScheduler, properties, encryptor, this); + rootGroup = new StandardProcessGroup(UUID.randomUUID().toString(), this, processScheduler, properties, encryptor, this, this.variableRegistry); rootGroup.setName(DEFAULT_ROOT_GROUP_NAME); instanceId = UUID.randomUUID().toString(); - controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository, stateManagerProvider); + controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository, stateManagerProvider, this.variableRegistry); if (remoteInputSocketPort == null) { LOG.info("Not enabling RAW Socket Site-to-Site functionality because nifi.remote.input.socket.port is not set"); @@ -971,7 +979,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @throws NullPointerException if the argument is null */ public ProcessGroup createProcessGroup(final String id) { - return new StandardProcessGroup(requireNonNull(id).intern(), this, processScheduler, properties, encryptor, this); + return new StandardProcessGroup(requireNonNull(id).intern(), this, processScheduler, properties, encryptor, this, variableRegistry); } /** @@ -1018,7 +1026,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R creationSuccessful = false; } - final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider); + final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, variableRegistry); final ProcessorNode procNode; if (creationSuccessful) { procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider); @@ -1295,7 +1303,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // invoke any methods annotated with @OnShutdown on Controller Services for (final ControllerServiceNode serviceNode : getAllControllerServices()) { try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, controllerServiceProvider, null); + final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, controllerServiceProvider, null, variableRegistry); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, serviceNode.getControllerServiceImplementation(), configContext); } } @@ -2785,15 +2793,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } - final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider); + final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider,variableRegistry); final ReportingTaskNode taskNode; if (creationSuccessful) { - taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory); + taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, variableRegistry); } else { final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; final String componentType = "(Missing) " + simpleClassName; - taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type); + taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type,variableRegistry); } taskNode.setName(task.getClass().getSimpleName()); @@ -3018,7 +3026,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R service.verifyCanDelete(); try (final NarCloseable x = NarCloseable.withNarLoader()) { - final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null); + final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null,variableRegistry); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, service.getControllerServiceImplementation(), configurationContext); }
http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index bcb3feb..cbf3b9d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@ -33,6 +33,7 @@ import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.StandardConfigurationContext; +import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FormatUtils; @@ -49,24 +50,27 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon private volatile String comment; private volatile ScheduledState scheduledState = ScheduledState.STOPPED; + protected final VariableRegistry variableRegistry; + public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id, final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler, - final ValidationContextFactory validationContextFactory) { + final ValidationContextFactory validationContextFactory, final VariableRegistry variableRegistry) { this(reportingTask, id, controllerServiceProvider, processScheduler, validationContextFactory, - reportingTask.getClass().getSimpleName(), reportingTask.getClass().getCanonicalName()); + reportingTask.getClass().getSimpleName(), reportingTask.getClass().getCanonicalName(),variableRegistry); } public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id, final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, - final String componentType, final String componentCanonicalClass) { + final String componentType, final String componentCanonicalClass, VariableRegistry variableRegistry) { super(reportingTask, id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass); this.reportingTask = reportingTask; this.processScheduler = processScheduler; this.serviceLookup = controllerServiceProvider; + this.variableRegistry = variableRegistry; } @Override @@ -111,7 +115,7 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon @Override public ConfigurationContext getConfigurationContext() { - return new StandardConfigurationContext(this, serviceLookup, getSchedulingPeriod()); + return new StandardConfigurationContext(this, serviceLookup, getSchedulingPeriod(), variableRegistry); } @Override @@ -141,6 +145,7 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon return super.removeProperty(name); } + public boolean isDisabled() { return scheduledState == ScheduledState.DISABLED; } http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java index b174c4c..205a690 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java @@ -34,6 +34,7 @@ import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.EventAccess; @@ -50,16 +51,18 @@ public class StandardReportingContext implements ReportingContext, ControllerSer private final ControllerServiceProvider serviceProvider; private final Map<PropertyDescriptor, String> properties; private final Map<PropertyDescriptor, PreparedQuery> preparedQueries; + private final VariableRegistry variableRegistry; public StandardReportingContext(final FlowController flowController, final BulletinRepository bulletinRepository, - final Map<PropertyDescriptor, String> properties, final ControllerServiceProvider serviceProvider, final ReportingTask reportingTask) { + final Map<PropertyDescriptor, String> properties, final ControllerServiceProvider serviceProvider, final ReportingTask reportingTask, + final VariableRegistry variableRegistry) { this.flowController = flowController; this.eventAccess = flowController; this.bulletinRepository = bulletinRepository; this.properties = Collections.unmodifiableMap(properties); this.serviceProvider = serviceProvider; this.reportingTask = reportingTask; - + this.variableRegistry = variableRegistry; preparedQueries = new HashMap<>(); for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) { final PropertyDescriptor desc = entry.getKey(); @@ -106,7 +109,7 @@ public class StandardReportingContext implements ReportingContext, ControllerSer @Override public PropertyValue getProperty(final PropertyDescriptor property) { final String configuredValue = properties.get(property); - return new StandardPropertyValue(configuredValue == null ? property.getDefaultValue() : configuredValue, this, preparedQueries.get(property)); + return new StandardPropertyValue(configuredValue == null ? property.getDefaultValue() : configuredValue, this, preparedQueries.get(property), variableRegistry); } @Override @@ -148,4 +151,5 @@ public class StandardReportingContext implements ReportingContext, ControllerSer public StateManager getStateManager() { return flowController.getStateManagerProvider().getStateManager(reportingTask.getIdentifier()); } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java index 539ada1..b57faa1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java @@ -24,6 +24,7 @@ import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ValidationContextFactory; +import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.ReportingTask; @@ -32,15 +33,16 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme private final FlowController flowController; public StandardReportingTaskNode(final ReportingTask reportingTask, final String id, final FlowController controller, - final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory) { - super(reportingTask, id, controller, processScheduler, validationContextFactory); + final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, + final VariableRegistry variableRegistry) { + super(reportingTask, id, controller, processScheduler, validationContextFactory, variableRegistry); this.flowController = controller; } public StandardReportingTaskNode(final ReportingTask reportingTask, final String id, final FlowController controller, final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, - final String componentType, final String canonicalClassName) { - super(reportingTask, id, controller, processScheduler, validationContextFactory, componentType, canonicalClassName); + final String componentType, final String canonicalClassName, VariableRegistry variableRegistry) { + super(reportingTask, id, controller, processScheduler, validationContextFactory, componentType, canonicalClassName,variableRegistry); this.flowController = controller; } @@ -56,6 +58,6 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme @Override public ReportingContext getReportingContext() { - return new StandardReportingContext(flowController, flowController.getBulletinRepository(), getProperties(), flowController, getReportingTask()); + return new StandardReportingContext(flowController, flowController.getBulletinRepository(), getProperties(), flowController, getReportingTask(), variableRegistry); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java index 262ac77..091d1f6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java @@ -45,6 +45,7 @@ import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.Connectables; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.ReflectionUtils; @@ -61,6 +62,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { private final ProcessContextFactory contextFactory; private final AtomicInteger maxThreadCount; private final StringEncryptor encryptor; + private final VariableRegistry variableRegistry; private volatile String adminYieldDuration = "1 sec"; @@ -68,7 +70,8 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { private final ConcurrentMap<Connectable, ScheduleState> scheduleStates = new ConcurrentHashMap<>(); public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider serviceProvider, final StateManagerProvider stateManagerProvider, - final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor) { + final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor, + final VariableRegistry variableRegistry) { super(flowEngine); this.serviceProvider = serviceProvider; this.stateManagerProvider = stateManagerProvider; @@ -76,6 +79,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { this.contextFactory = contextFactory; this.maxThreadCount = new AtomicInteger(maxThreadCount); this.encryptor = encryptor; + this.variableRegistry = variableRegistry; for (int i = 0; i < maxThreadCount; i++) { final Runnable eventDrivenTask = new EventDrivenTask(workerQueue); @@ -185,7 +189,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { if (connectable instanceof ProcessorNode) { final ProcessorNode procNode = (ProcessorNode) connectable; final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider, - encryptor, getStateManager(connectable.getIdentifier())); + encryptor, getStateManager(connectable.getIdentifier()), variableRegistry); final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS); final ProcessSessionFactory sessionFactory; http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java index 3f19d28..34e7989 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java @@ -38,6 +38,7 @@ import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.FormatUtils; import org.quartz.CronExpression; import org.slf4j.Logger; @@ -50,15 +51,18 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent { private final FlowController flowController; private final ProcessContextFactory contextFactory; private final StringEncryptor encryptor; + private final VariableRegistry variableRegistry; private volatile String adminYieldDuration = "1 sec"; private final Map<Object, List<AtomicBoolean>> canceledTriggers = new HashMap<>(); - public QuartzSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final ProcessContextFactory contextFactory, final StringEncryptor enryptor) { + public QuartzSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final ProcessContextFactory contextFactory, final StringEncryptor enryptor, + final VariableRegistry variableRegistry) { super(flowEngine); this.flowController = flowController; this.contextFactory = contextFactory; this.encryptor = enryptor; + this.variableRegistry = variableRegistry; } private StateManager getStateManager(final String componentId) { @@ -141,7 +145,7 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent { if (connectable.getConnectableType() == ConnectableType.PROCESSOR) { final ProcessorNode procNode = (ProcessorNode) connectable; - final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor, getStateManager(connectable.getIdentifier())); + final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor, getStateManager(connectable.getIdentifier()), variableRegistry); ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, standardProcContext); continuallyRunTask = runnableTask; } else { http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 53fc726..dad73c1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -52,6 +52,7 @@ import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardProcessContext; +import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FormatUtils; @@ -81,12 +82,14 @@ public final class StandardProcessScheduler implements ProcessScheduler { private final ScheduledExecutorService componentMonitoringThreadPool = new FlowEngine(8, "StandardProcessScheduler", true); private final StringEncryptor encryptor; + private final VariableRegistry variableRegistry; public StandardProcessScheduler(final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, - final StateManagerProvider stateManagerProvider) { + final StateManagerProvider stateManagerProvider, final VariableRegistry variableRegistry) { this.controllerServiceProvider = controllerServiceProvider; this.encryptor = encryptor; this.stateManagerProvider = stateManagerProvider; + this.variableRegistry = variableRegistry; administrativeYieldDuration = NiFiProperties.getInstance().getAdministrativeYieldDuration(); administrativeYieldMillis = FormatUtils.getTimeDuration(administrativeYieldDuration, TimeUnit.MILLISECONDS); @@ -290,7 +293,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { @Override public synchronized void startProcessor(final ProcessorNode procNode) { StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider, - this.encryptor, getStateManager(procNode.getIdentifier())); + this.encryptor, getStateManager(procNode.getIdentifier()), variableRegistry); final ScheduleState scheduleState = getScheduleState(requireNonNull(procNode)); SchedulingAgentCallback callback = new SchedulingAgentCallback() { @@ -324,7 +327,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { @Override public synchronized void stopProcessor(final ProcessorNode procNode) { StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider, - this.encryptor, getStateManager(procNode.getIdentifier())); + this.encryptor, getStateManager(procNode.getIdentifier()), variableRegistry); final ScheduleState state = getScheduleState(procNode); procNode.stop(this.componentLifeCycleThreadPool, processContext, new Callable<Boolean>() { http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java index 0436e21..f94beff 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java @@ -37,6 +37,7 @@ import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; @@ -50,14 +51,17 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent { private final FlowController flowController; private final ProcessContextFactory contextFactory; private final StringEncryptor encryptor; + private final VariableRegistry variableRegistry; private volatile String adminYieldDuration = "1 sec"; - public TimerDrivenSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final ProcessContextFactory contextFactory, final StringEncryptor encryptor) { + public TimerDrivenSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final ProcessContextFactory contextFactory, final StringEncryptor encryptor, + final VariableRegistry variableRegistry) { super(flowEngine); this.flowController = flowController; this.contextFactory = contextFactory; this.encryptor = encryptor; + this.variableRegistry = variableRegistry; final String boredYieldDuration = NiFiProperties.getInstance().getBoredYieldDuration(); try { @@ -100,7 +104,7 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent { // Determine the task to run and create it. if (connectable.getConnectableType() == ConnectableType.PROCESSOR) { final ProcessorNode procNode = (ProcessorNode) connectable; - final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor, getStateManager(connectable.getIdentifier())); + final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor, getStateManager(connectable.getIdentifier()), variableRegistry); final ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, standardProcContext); http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java index d57e61f..61db819 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java @@ -28,6 +28,7 @@ import org.apache.nifi.components.PropertyValue; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.FormatUtils; public class StandardConfigurationContext implements ConfigurationContext { @@ -35,13 +36,17 @@ public class StandardConfigurationContext implements ConfigurationContext { private final ConfiguredComponent component; private final ControllerServiceLookup serviceLookup; private final Map<PropertyDescriptor, PreparedQuery> preparedQueries; + private final VariableRegistry variableRegistry; private final String schedulingPeriod; private final Long schedulingNanos; - public StandardConfigurationContext(final ConfiguredComponent component, final ControllerServiceLookup serviceLookup, final String schedulingPeriod) { + public StandardConfigurationContext(final ConfiguredComponent component, final ControllerServiceLookup serviceLookup, final String schedulingPeriod, + final VariableRegistry variableRegistry) { this.component = component; this.serviceLookup = serviceLookup; this.schedulingPeriod = schedulingPeriod; + this.variableRegistry = variableRegistry; + if (schedulingPeriod == null) { schedulingNanos = null; } else { @@ -68,7 +73,7 @@ public class StandardConfigurationContext implements ConfigurationContext { @Override public PropertyValue getProperty(final PropertyDescriptor property) { final String configuredValue = component.getProperty(property); - return new StandardPropertyValue(configuredValue == null ? property.getDefaultValue() : configuredValue, serviceLookup, preparedQueries.get(property)); + return new StandardPropertyValue(configuredValue == null ? property.getDefaultValue() : configuredValue, serviceLookup, preparedQueries.get(property), variableRegistry); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index 0c1e6b3..aba5f0c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -48,6 +48,7 @@ import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.SimpleProcessLogger; +import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +60,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i private final ControllerService proxedControllerService; private final ControllerService implementation; private final ControllerServiceProvider serviceProvider; - + private final VariableRegistry variableRegistry; private final AtomicReference<ControllerServiceState> stateRef = new AtomicReference<>(ControllerServiceState.DISABLED); private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); @@ -73,21 +74,24 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i private final AtomicBoolean active; public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id, - final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) { + final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, + final VariableRegistry variableRegistry) { this(proxiedControllerService, implementation, id, validationContextFactory, serviceProvider, - implementation.getClass().getSimpleName(), implementation.getClass().getCanonicalName()); + implementation.getClass().getSimpleName(), implementation.getClass().getCanonicalName(), variableRegistry); } public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id, - final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, - final String componentType, final String componentCanonicalClass) { + final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, + final String componentType, final String componentCanonicalClass, VariableRegistry variableRegistry) { super(implementation, id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass); this.proxedControllerService = proxiedControllerService; this.implementation = implementation; this.serviceProvider = serviceProvider; this.active = new AtomicBoolean(); + this.variableRegistry = variableRegistry; + } @Override @@ -335,7 +339,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis) { if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)) { this.active.set(true); - final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null); + final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, variableRegistry); scheduler.execute(new Runnable() { @Override public void run() { @@ -398,7 +402,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i } if (this.stateRef.compareAndSet(ControllerServiceState.ENABLED, ControllerServiceState.DISABLING)) { - final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null); + final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, variableRegistry); scheduler.execute(new Runnable() { @Override public void run() { http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 2c8d258..7c6cd29 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -55,6 +55,8 @@ import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardValidationContextFactory; +import org.apache.nifi.registry.VariableRegistry; + import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.ReflectionUtils; @@ -69,6 +71,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi private static final Set<Method> validDisabledMethods; private final BulletinRepository bulletinRepo; private final StateManagerProvider stateManagerProvider; + private final VariableRegistry variableRegistry; private final FlowController flowController; static { @@ -84,12 +87,13 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } public StandardControllerServiceProvider(final FlowController flowController, final ProcessScheduler scheduler, final BulletinRepository bulletinRepo, - final StateManagerProvider stateManagerProvider) { + final StateManagerProvider stateManagerProvider,final VariableRegistry variableRegistry) { this.flowController = flowController; this.processScheduler = scheduler; this.bulletinRepo = bulletinRepo; this.stateManagerProvider = stateManagerProvider; + this.variableRegistry = variableRegistry; } @@ -187,9 +191,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi final ComponentLog serviceLogger = new SimpleProcessLogger(id, originalService); originalService.initialize(new StandardControllerServiceInitializationContext(id, serviceLogger, this, getStateManager(id))); - final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this); + final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this, variableRegistry); - final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this); + final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this, variableRegistry); serviceNodeHolder.set(serviceNode); serviceNode.setName(rawClass.getSimpleName()); @@ -258,7 +262,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi final String componentType = "(Missing) " + simpleClassName; final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, proxiedService, id, - new StandardValidationContextFactory(this), this, componentType, type); + new StandardValidationContextFactory(this,variableRegistry), this, componentType, type, variableRegistry); return serviceNode; } http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java index 8f20aab..7e247fd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java @@ -46,11 +46,12 @@ import org.apache.nifi.controller.state.config.StateProviderConfiguration; import org.apache.nifi.framework.security.util.SslContextFactory; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.processor.StandardValidationContext; +import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StandardStateManagerProvider implements StateManagerProvider { +public class StandardStateManagerProvider implements StateManagerProvider{ private static final Logger logger = LoggerFactory.getLogger(StandardStateManagerProvider.class); private final ConcurrentMap<String, StateManager> stateManagers = new ConcurrentHashMap<>(); @@ -62,12 +63,12 @@ public class StandardStateManagerProvider implements StateManagerProvider { this.clusterStateProvider = clusterStateProvider; } - public static StateManagerProvider create(final NiFiProperties properties) throws ConfigParseException, IOException { - final StateProvider localProvider = createLocalStateProvider(properties); + public static StateManagerProvider create(final NiFiProperties properties, final VariableRegistry variableRegistry) throws ConfigParseException, IOException { + final StateProvider localProvider = createLocalStateProvider(properties,variableRegistry); final StateProvider clusterProvider; if (properties.isNode()) { - clusterProvider = createClusteredStateProvider(properties); + clusterProvider = createClusteredStateProvider(properties,variableRegistry); } else { clusterProvider = null; } @@ -75,19 +76,20 @@ public class StandardStateManagerProvider implements StateManagerProvider { return new StandardStateManagerProvider(localProvider, clusterProvider); } - private static StateProvider createLocalStateProvider(final NiFiProperties properties) throws IOException, ConfigParseException { + private static StateProvider createLocalStateProvider(final NiFiProperties properties, final VariableRegistry variableRegistry) throws IOException, ConfigParseException { final File configFile = properties.getStateManagementConfigFile(); - return createStateProvider(configFile, Scope.LOCAL, properties); + return createStateProvider(configFile, Scope.LOCAL, properties, variableRegistry); } - private static StateProvider createClusteredStateProvider(final NiFiProperties properties) throws IOException, ConfigParseException { + private static StateProvider createClusteredStateProvider(final NiFiProperties properties, final VariableRegistry variableRegistry) throws IOException, ConfigParseException { final File configFile = properties.getStateManagementConfigFile(); - return createStateProvider(configFile, Scope.CLUSTER, properties); + return createStateProvider(configFile, Scope.CLUSTER, properties, variableRegistry); } - private static StateProvider createStateProvider(final File configFile, final Scope scope, final NiFiProperties properties) throws ConfigParseException, IOException { + private static StateProvider createStateProvider(final File configFile, final Scope scope, final NiFiProperties properties, + final VariableRegistry variableRegistry) throws ConfigParseException, IOException { final String providerId; final String providerIdPropertyName; final String providerDescription; @@ -166,17 +168,18 @@ public class StandardStateManagerProvider implements StateManagerProvider { + " is configured to use scope " + scope); } + //create variable registry final Map<PropertyDescriptor, PropertyValue> propertyMap = new HashMap<>(); final Map<PropertyDescriptor, String> propertyStringMap = new HashMap<>(); for (final PropertyDescriptor descriptor : provider.getPropertyDescriptors()) { - propertyMap.put(descriptor, new StandardPropertyValue(descriptor.getDefaultValue(), null)); + propertyMap.put(descriptor, new StandardPropertyValue(descriptor.getDefaultValue(),null, variableRegistry)); propertyStringMap.put(descriptor, descriptor.getDefaultValue()); } for (final Map.Entry<String, String> entry : providerConfig.getProperties().entrySet()) { final PropertyDescriptor descriptor = provider.getPropertyDescriptor(entry.getKey()); propertyStringMap.put(descriptor, entry.getValue()); - propertyMap.put(descriptor, new StandardPropertyValue(entry.getValue(), null)); + propertyMap.put(descriptor, new StandardPropertyValue(entry.getValue(),null, variableRegistry)); } final SSLContext sslContext = SslContextFactory.createSslContext(properties, false); @@ -186,7 +189,7 @@ public class StandardStateManagerProvider implements StateManagerProvider { provider.initialize(initContext); } - final ValidationContext validationContext = new StandardValidationContext(null, propertyStringMap, null, null, null); + final ValidationContext validationContext = new StandardValidationContext(null, propertyStringMap, null, null, null,variableRegistry); final Collection<ValidationResult> results = provider.validate(validationContext); final StringBuilder validationFailures = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 80e898a..60f7c6f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -54,6 +54,7 @@ import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.logging.LogRepositoryFactory; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.StandardProcessContext; +import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.util.NiFiProperties; @@ -101,6 +102,7 @@ public final class StandardProcessGroup implements ProcessGroup { private final Map<String, ControllerServiceNode> controllerServices = new HashMap<>(); private final Map<String, Template> templates = new HashMap<>(); private final StringEncryptor encryptor; + private final VariableRegistry variableRegistry; private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); @@ -109,7 +111,8 @@ public final class StandardProcessGroup implements ProcessGroup { private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroup.class); public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final StandardProcessScheduler scheduler, - final NiFiProperties nifiProps, final StringEncryptor encryptor, final FlowController flowController) { + final NiFiProperties nifiProps, final StringEncryptor encryptor, final FlowController flowController, + final VariableRegistry variableRegistry) { this.id = id; this.controllerServiceProvider = serviceProvider; this.parent = new AtomicReference<>(); @@ -117,6 +120,7 @@ public final class StandardProcessGroup implements ProcessGroup { this.comments = new AtomicReference<>(""); this.encryptor = encryptor; this.flowController = flowController; + this.variableRegistry = variableRegistry; name = new AtomicReference<>(); position = new AtomicReference<>(new Position(0D, 0D)); @@ -345,7 +349,7 @@ public final class StandardProcessGroup implements ProcessGroup { private void shutdown(final ProcessGroup procGroup) { for (final ProcessorNode node : procGroup.getProcessors()) { try (final NarCloseable x = NarCloseable.withNarLoader()) { - final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier())); + final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()),variableRegistry); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor(), processContext); } } @@ -710,7 +714,7 @@ public final class StandardProcessGroup implements ProcessGroup { } try (final NarCloseable x = NarCloseable.withNarLoader()) { - final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier())); + final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()),variableRegistry); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext); } catch (final Exception e) { throw new ComponentLifeCycleException("Failed to invoke 'OnRemoved' methods of " + processor, e); @@ -1839,7 +1843,7 @@ public final class StandardProcessGroup implements ProcessGroup { service.verifyCanDelete(); try (final NarCloseable x = NarCloseable.withNarLoader()) { - final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null); + final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null, variableRegistry); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, service.getControllerServiceImplementation(), configurationContext); } http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java index 3c5acbb..7bb9035 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java @@ -36,6 +36,7 @@ import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.Connectables; public class StandardProcessContext implements ProcessContext, ControllerServiceLookup { @@ -45,12 +46,15 @@ public class StandardProcessContext implements ProcessContext, ControllerService private final Map<PropertyDescriptor, PreparedQuery> preparedQueries; private final StringEncryptor encryptor; private final StateManager stateManager; + private final VariableRegistry variableRegistry; - public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, final StateManager stateManager) { + public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, final StateManager stateManager, + final VariableRegistry variableRegistry) { this.procNode = processorNode; this.controllerServiceProvider = controllerServiceProvider; this.encryptor = encryptor; this.stateManager = stateManager; + this.variableRegistry = variableRegistry; preparedQueries = new HashMap<>(); for (final Map.Entry<PropertyDescriptor, String> entry : procNode.getProperties().entrySet()) { @@ -86,12 +90,12 @@ public class StandardProcessContext implements ProcessContext, ControllerService final String setPropertyValue = procNode.getProperty(descriptor); final String propValue = (setPropertyValue == null) ? descriptor.getDefaultValue() : setPropertyValue; - return new StandardPropertyValue(propValue, this, preparedQueries.get(descriptor)); + return new StandardPropertyValue(propValue, this, preparedQueries.get(descriptor), variableRegistry); } @Override public PropertyValue newPropertyValue(final String rawValue) { - return new StandardPropertyValue(rawValue, this, Query.prepare(rawValue)); + return new StandardPropertyValue(rawValue, this, Query.prepare(rawValue), variableRegistry); } @Override @@ -221,4 +225,5 @@ public class StandardProcessContext implements ProcessContext, ControllerService public String getName() { return procNode.getName(); } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java index 7282ee9..dfc7965 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java @@ -1,3 +1,4 @@ + /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -36,6 +37,7 @@ import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.expression.ExpressionLanguageCompiler; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.registry.VariableRegistry; public class StandardValidationContext implements ValidationContext { @@ -45,12 +47,13 @@ public class StandardValidationContext implements ValidationContext { private final Map<String, Boolean> expressionLanguageSupported; private final String annotationData; private final Set<String> serviceIdentifiersToNotValidate; + private final VariableRegistry variableRegistry; private final String groupId; private final String componentId; public StandardValidationContext(final ControllerServiceProvider controllerServiceProvider, final Map<PropertyDescriptor, String> properties, - final String annotationData, final String groupId, final String componentId) { - this(controllerServiceProvider, Collections.<String> emptySet(), properties, annotationData, groupId, componentId); + final String annotationData, final String groupId, final String componentId, VariableRegistry variableRegistry) { + this(controllerServiceProvider, Collections.<String> emptySet(), properties, annotationData, groupId, componentId,variableRegistry); } public StandardValidationContext( @@ -59,11 +62,12 @@ public class StandardValidationContext implements ValidationContext { final Map<PropertyDescriptor, String> properties, final String annotationData, final String groupId, - final String componentId) { + final String componentId, VariableRegistry variableRegistry) { this.controllerServiceProvider = controllerServiceProvider; this.properties = new HashMap<>(properties); this.annotationData = annotationData; this.serviceIdentifiersToNotValidate = serviceIdentifiersToNotValidate; + this.variableRegistry = variableRegistry; this.groupId = groupId; this.componentId = componentId; @@ -87,12 +91,12 @@ public class StandardValidationContext implements ValidationContext { @Override public PropertyValue newPropertyValue(final String rawValue) { - return new StandardPropertyValue(rawValue, controllerServiceProvider, Query.prepare(rawValue)); + return new StandardPropertyValue(rawValue, controllerServiceProvider, Query.prepare(rawValue), variableRegistry); } @Override public ExpressionLanguageCompiler newExpressionLanguageCompiler() { - return new StandardExpressionLanguageCompiler(); + return new StandardExpressionLanguageCompiler(variableRegistry); } @Override @@ -100,13 +104,13 @@ public class StandardValidationContext implements ValidationContext { final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(controllerService.getIdentifier()); final ProcessGroup serviceGroup = serviceNode.getProcessGroup(); final String serviceGroupId = serviceGroup == null ? null : serviceGroup.getIdentifier(); - return new StandardValidationContext(controllerServiceProvider, serviceNode.getProperties(), serviceNode.getAnnotationData(), serviceGroupId, serviceNode.getIdentifier()); + return new StandardValidationContext(controllerServiceProvider, serviceNode.getProperties(), serviceNode.getAnnotationData(), serviceGroupId, serviceNode.getIdentifier(),variableRegistry); } @Override public PropertyValue getProperty(final PropertyDescriptor property) { final String configuredValue = properties.get(property); - return new StandardPropertyValue(configuredValue == null ? property.getDefaultValue() : configuredValue, controllerServiceProvider, preparedQueries.get(property)); + return new StandardPropertyValue(configuredValue == null ? property.getDefaultValue() : configuredValue, controllerServiceProvider, preparedQueries.get(property), variableRegistry); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java index 1c52e17..020e979 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java @@ -23,23 +23,26 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.registry.VariableRegistry; public class StandardValidationContextFactory implements ValidationContextFactory { private final ControllerServiceProvider serviceProvider; + private final VariableRegistry variableRegistry; - public StandardValidationContextFactory(final ControllerServiceProvider serviceProvider) { + public StandardValidationContextFactory(final ControllerServiceProvider serviceProvider, final VariableRegistry variableRegistry) { this.serviceProvider = serviceProvider; + this.variableRegistry = variableRegistry; } @Override public ValidationContext newValidationContext(final Map<PropertyDescriptor, String> properties, final String annotationData, final String groupId, final String componentId) { - return new StandardValidationContext(serviceProvider, properties, annotationData, groupId, componentId); + return new StandardValidationContext(serviceProvider, properties, annotationData, groupId, componentId,variableRegistry); } @Override public ValidationContext newValidationContext(final Set<String> serviceIdentifiersToNotValidate, final Map<PropertyDescriptor, String> properties, final String annotationData, final String groupId, String componentId) { - return new StandardValidationContext(serviceProvider, serviceIdentifiersToNotValidate, properties, annotationData, groupId, componentId); + return new StandardValidationContext(serviceProvider, serviceIdentifiersToNotValidate, properties, annotationData, groupId, componentId,variableRegistry); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java index fb0ce7c..2760ca9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java @@ -24,8 +24,11 @@ import org.apache.nifi.cluster.protocol.NodeProtocolSender; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.FactoryBean; import org.springframework.context.ApplicationContext; @@ -37,6 +40,8 @@ import org.springframework.context.ApplicationContextAware; @SuppressWarnings("rawtypes") public class FlowControllerFactoryBean implements FactoryBean, ApplicationContextAware { + private static final Logger LOG = LoggerFactory.getLogger(FlowControllerFactoryBean.class); + private ApplicationContext applicationContext; private FlowController flowController; private NiFiProperties properties; @@ -45,6 +50,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex private StringEncryptor encryptor; private BulletinRepository bulletinRepository; private ClusterCoordinator clusterCoordinator; + private VariableRegistry variableRegistry; @Override public Object getObject() throws Exception { @@ -63,7 +69,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex nodeProtocolSender, bulletinRepository, clusterCoordinator, - heartbeatMonitor); + heartbeatMonitor, variableRegistry); } else { flowController = FlowController.createStandaloneInstance( flowFileEventRepository, @@ -71,7 +77,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex authorizer, auditService, encryptor, - bulletinRepository); + bulletinRepository, variableRegistry); } } @@ -79,6 +85,8 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex return flowController; } + + @Override public Class getObjectType() { return FlowController.class; @@ -114,6 +122,10 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex this.bulletinRepository = bulletinRepository; } + public void setVariableRegistry(VariableRegistry variableRegistry) { + this.variableRegistry = variableRegistry; + } + public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) { this.clusterCoordinator = clusterCoordinator; } http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml index 3cd5159..59c1b80 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml @@ -27,6 +27,11 @@ <!-- nifi properties created via getInstance using a file path specified as a system property --> <bean id="nifiProperties" class="org.apache.nifi.util.NiFiProperties" factory-method="getInstance"/> + <!-- variable registry --> + <bean id="variableRegistry" class="org.apache.nifi.registry.VariableRegistryUtils" factory-method="createCustomVariableRegistry"> + <constructor-arg type="java.nio.file.Path[]" value="#{nifiProperties.getVariableRegistryPropertiesPaths()}" /> + </bean> + <!-- flow file event repository --> <bean id="flowFileEventRepository" class="org.apache.nifi.spring.RingBufferEventRepositoryBean"> </bean> @@ -41,6 +46,7 @@ <property name="encryptor" ref="stringEncryptor" /> <property name="bulletinRepository" ref="bulletinRepository" /> <property name="clusterCoordinator" ref="clusterCoordinator" /> + <property name="variableRegistry" ref="variableRegistry"/> </bean> <!-- flow service --> http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java index 15611af..c864902 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java @@ -26,6 +26,8 @@ import org.apache.nifi.controller.serialization.FlowSerializer; import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.events.VolatileBulletinRepository; +import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.VariableRegistryUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; @@ -63,6 +65,7 @@ public class StandardFlowServiceTest { private AuditService mockAuditService; private StringEncryptor mockEncryptor; private RevisionManager revisionManager; + private VariableRegistry variableRegistry; @BeforeClass public static void setupSuite() { @@ -72,11 +75,13 @@ public class StandardFlowServiceTest { @Before public void setup() throws Exception { properties = NiFiProperties.getInstance(); + variableRegistry = VariableRegistryUtils.createCustomVariableRegistry(properties.getVariableRegistryPropertiesPaths()); mockFlowFileEventRepository = mock(FlowFileEventRepository.class); authorizer = mock(Authorizer.class); mockAuditService = mock(AuditService.class); revisionManager = mock(RevisionManager.class); - flowController = FlowController.createStandaloneInstance(mockFlowFileEventRepository, properties, authorizer, mockAuditService, mockEncryptor, new VolatileBulletinRepository()); + flowController = FlowController.createStandaloneInstance(mockFlowFileEventRepository, properties, authorizer, mockAuditService, mockEncryptor, + new VolatileBulletinRepository(), variableRegistry); flowService = StandardFlowService.createStandaloneInstance(flowController, properties, mockEncryptor, revisionManager, authorizer); } http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java index 6e3f475..e9c2c00 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java @@ -33,6 +33,8 @@ import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.MockProvenanceRepository; +import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.VariableRegistryUtils; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.util.NiFiProperties; import org.junit.After; @@ -61,6 +63,7 @@ public class TestFlowController { private StringEncryptor encryptor; private NiFiProperties properties; private BulletinRepository bulletinRepo; + private VariableRegistry variableRegistry; @Before public void setup() { @@ -111,9 +114,10 @@ public class TestFlowController { policies1.add(policy2); authorizer = new MockPolicyBasedAuthorizer(groups1, users1, policies1); + variableRegistry = VariableRegistryUtils.createCustomVariableRegistry(properties.getVariableRegistryPropertiesPaths()); bulletinRepo = Mockito.mock(BulletinRepository.class); - controller = FlowController.createStandaloneInstance(flowFileEventRepo, properties, authorizer, auditService, encryptor, bulletinRepo); + controller = FlowController.createStandaloneInstance(flowFileEventRepo, properties, authorizer, auditService, encryptor, bulletinRepo,variableRegistry); standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor()); } @@ -165,7 +169,7 @@ public class TestFlowController { assertNotEquals(authFingerprint, authorizer.getFingerprint()); controller.shutdown(true); - controller = FlowController.createStandaloneInstance(flowFileEventRepo, properties, authorizer, auditService, encryptor, bulletinRepo); + controller = FlowController.createStandaloneInstance(flowFileEventRepo, properties, authorizer, auditService, encryptor, bulletinRepo,variableRegistry); controller.synchronize(standardFlowSynchronizer, proposedDataFlow); assertEquals(authFingerprint, authorizer.getFingerprint()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/8412d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index e5b3342..a7137a1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -43,6 +43,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.provenance.MockProvenanceRepository; +import org.apache.nifi.registry.VariableRegistryUtils; import org.apache.nifi.util.NiFiProperties; import org.junit.After; import org.junit.Before; @@ -661,7 +662,8 @@ public class TestProcessorLifecycle { properties.setProperty("nifi.remote.input.secure", ""); return FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), properties, - mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository()); + mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository(), + VariableRegistryUtils.createCustomVariableRegistry(properties.getVariableRegistryPropertiesPaths())); } /**
