http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index 9611b73..07923c6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@ -16,6 +16,14 @@ */ package org.apache.nifi.controller.reporting; +import java.net.URL; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ConfigurableComponent; @@ -33,7 +41,7 @@ import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.CharacterFilterUtils; @@ -42,14 +50,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.annotation.AnnotationUtils; -import java.net.URL; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - public abstract class AbstractReportingTaskNode extends AbstractConfiguredComponent implements ReportingTaskNode { private static final Logger LOG = LoggerFactory.getLogger(AbstractReportingTaskNode.class); @@ -66,7 +66,7 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon public AbstractReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler, - final ValidationContextFactory validationContextFactory, final VariableRegistry variableRegistry, + final ValidationContextFactory validationContextFactory, final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent) { this(reportingTask, id, controllerServiceProvider, processScheduler, validationContextFactory, @@ -77,7 +77,7 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon public AbstractReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, - final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry, + final String componentType, final String componentCanonicalClass, final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final boolean isExtensionMissing) { super(id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, isExtensionMissing);
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java index 53b7d15..40bdf8b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java @@ -22,13 +22,13 @@ import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.resource.ResourceType; -import org.apache.nifi.controller.ReloadComponent; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.LoggableComponent; import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.ReloadComponent; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ValidationContextFactory; -import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.ReportingTask; @@ -38,14 +38,14 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme public StandardReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final FlowController controller, final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, - final VariableRegistry variableRegistry, final ReloadComponent reloadComponent) { + final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent) { super(reportingTask, id, controller, processScheduler, validationContextFactory, variableRegistry, reloadComponent); this.flowController = controller; } public StandardReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final FlowController controller, final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, - final String componentType, final String canonicalClassName, final VariableRegistry variableRegistry, + final String componentType, final String canonicalClassName, final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final boolean isExtensionMissing) { super(reportingTask, id, controller, processScheduler, validationContextFactory, componentType, canonicalClassName, variableRegistry, reloadComponent, isExtensionMissing); http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java index 0af8657..58d25bb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java @@ -45,7 +45,6 @@ import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.Connectables; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.ReflectionUtils; @@ -62,7 +61,6 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { private final ProcessContextFactory contextFactory; private final AtomicInteger maxThreadCount; private final StringEncryptor encryptor; - private final VariableRegistry variableRegistry; private volatile String adminYieldDuration = "1 sec"; @@ -70,8 +68,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { private final ConcurrentMap<Connectable, ScheduleState> scheduleStates = new ConcurrentHashMap<>(); public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider serviceProvider, final StateManagerProvider stateManagerProvider, - final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor, - final VariableRegistry variableRegistry) { + final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor) { super(flowEngine); this.serviceProvider = serviceProvider; this.stateManagerProvider = stateManagerProvider; @@ -79,7 +76,6 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { this.contextFactory = contextFactory; this.maxThreadCount = new AtomicInteger(maxThreadCount); this.encryptor = encryptor; - this.variableRegistry = variableRegistry; for (int i = 0; i < maxThreadCount; i++) { final Runnable eventDrivenTask = new EventDrivenTask(workerQueue); @@ -188,8 +184,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { if (connectable instanceof ProcessorNode) { final ProcessorNode procNode = (ProcessorNode) connectable; - final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider, - encryptor, getStateManager(connectable.getIdentifier()), variableRegistry); + final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider, encryptor, getStateManager(connectable.getIdentifier())); final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS); final ProcessSessionFactory sessionFactory; http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java index bd3c2bb..eb855d4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java @@ -38,7 +38,6 @@ import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.FormatUtils; import org.quartz.CronExpression; import org.slf4j.Logger; @@ -51,18 +50,15 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent { private final FlowController flowController; private final ProcessContextFactory contextFactory; private final StringEncryptor encryptor; - private final VariableRegistry variableRegistry; private volatile String adminYieldDuration = "1 sec"; private final Map<Object, List<AtomicBoolean>> canceledTriggers = new HashMap<>(); - public QuartzSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final ProcessContextFactory contextFactory, final StringEncryptor enryptor, - final VariableRegistry variableRegistry) { + public QuartzSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final ProcessContextFactory contextFactory, final StringEncryptor enryptor) { super(flowEngine); this.flowController = flowController; this.contextFactory = contextFactory; this.encryptor = enryptor; - this.variableRegistry = variableRegistry; } private StateManager getStateManager(final String componentId) { @@ -145,7 +141,7 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent { if (connectable.getConnectableType() == ConnectableType.PROCESSOR) { final ProcessorNode procNode = (ProcessorNode) connectable; - final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor, getStateManager(connectable.getIdentifier()), variableRegistry); + final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor, getStateManager(connectable.getIdentifier())); ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, standardProcContext); continuallyRunTask = runnableTask; } else { http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 5368d37..122e50c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -53,7 +53,6 @@ import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardProcessContext; -import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FormatUtils; @@ -84,19 +83,16 @@ public final class StandardProcessScheduler implements ProcessScheduler { private final ScheduledExecutorService componentMonitoringThreadPool = new FlowEngine(8, "StandardProcessScheduler", true); private final StringEncryptor encryptor; - private final VariableRegistry variableRegistry; public StandardProcessScheduler( final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, final StateManagerProvider stateManagerProvider, - final VariableRegistry variableRegistry, final NiFiProperties nifiProperties ) { this.controllerServiceProvider = controllerServiceProvider; this.encryptor = encryptor; this.stateManagerProvider = stateManagerProvider; - this.variableRegistry = variableRegistry; administrativeYieldDuration = nifiProperties.getAdministrativeYieldDuration(); administrativeYieldMillis = FormatUtils.getTimeDuration(administrativeYieldDuration, TimeUnit.MILLISECONDS); @@ -301,15 +297,17 @@ public final class StandardProcessScheduler implements ProcessScheduler { * @see StandardProcessorNode#start(ScheduledExecutorService, long, org.apache.nifi.processor.ProcessContext, Runnable). */ @Override - public synchronized void startProcessor(final ProcessorNode procNode) { + public synchronized CompletableFuture<Void> startProcessor(final ProcessorNode procNode) { StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider, - this.encryptor, getStateManager(procNode.getIdentifier()), variableRegistry); + this.encryptor, getStateManager(procNode.getIdentifier())); final ScheduleState scheduleState = getScheduleState(requireNonNull(procNode)); + final CompletableFuture<Void> future = new CompletableFuture<>(); SchedulingAgentCallback callback = new SchedulingAgentCallback() { @Override public void trigger() { getSchedulingAgent(procNode).schedule(procNode, scheduleState); + future.complete(null); } @Override @@ -324,7 +322,9 @@ public final class StandardProcessScheduler implements ProcessScheduler { } }; + LOG.info("Starting {}", procNode); procNode.start(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, processContext, callback); + return future; } /** @@ -335,12 +335,13 @@ public final class StandardProcessScheduler implements ProcessScheduler { * @see StandardProcessorNode#stop(ScheduledExecutorService, org.apache.nifi.processor.ProcessContext, SchedulingAgent, ScheduleState) */ @Override - public synchronized void stopProcessor(final ProcessorNode procNode) { + public synchronized CompletableFuture<Void> stopProcessor(final ProcessorNode procNode) { StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider, - this.encryptor, getStateManager(procNode.getIdentifier()), variableRegistry); + this.encryptor, getStateManager(procNode.getIdentifier())); final ScheduleState state = getScheduleState(procNode); - procNode.stop(this.componentLifeCycleThreadPool, processContext, getSchedulingAgent(procNode), state); + LOG.info("Stopping {}", procNode); + return procNode.stop(this.componentLifeCycleThreadPool, processContext, getSchedulingAgent(procNode), state); } @Override @@ -537,20 +538,35 @@ public final class StandardProcessScheduler implements ProcessScheduler { @Override public CompletableFuture<Void> enableControllerService(final ControllerServiceNode service) { + LOG.info("Enabling " + service); return service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis); } @Override - public void disableControllerService(final ControllerServiceNode service) { - service.disable(this.componentLifeCycleThreadPool); + public CompletableFuture<Void> disableControllerService(final ControllerServiceNode service) { + LOG.info("Disabling {}", service); + return service.disable(this.componentLifeCycleThreadPool); } @Override - public void disableControllerServices(final List<ControllerServiceNode> services) { + public CompletableFuture<Void> disableControllerServices(final List<ControllerServiceNode> services) { + if (services == null || services.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + CompletableFuture<Void> future = null; if (!requireNonNull(services).isEmpty()) { for (ControllerServiceNode controllerServiceNode : services) { - this.disableControllerService(controllerServiceNode); + final CompletableFuture<Void> serviceFuture = this.disableControllerService(controllerServiceNode); + + if (future == null) { + future = serviceFuture; + } else { + future = CompletableFuture.allOf(future, serviceFuture); + } } } + + return future; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java index a82fde4..9dc329a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java @@ -37,7 +37,6 @@ import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; @@ -51,7 +50,6 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent { private final FlowController flowController; private final ProcessContextFactory contextFactory; private final StringEncryptor encryptor; - private final VariableRegistry variableRegistry; private volatile String adminYieldDuration = "1 sec"; @@ -60,13 +58,11 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent { final FlowEngine flowEngine, final ProcessContextFactory contextFactory, final StringEncryptor encryptor, - final VariableRegistry variableRegistry, final NiFiProperties nifiProperties) { super(flowEngine); this.flowController = flowController; this.contextFactory = contextFactory; this.encryptor = encryptor; - this.variableRegistry = variableRegistry; final String boredYieldDuration = nifiProperties.getBoredYieldDuration(); try { @@ -109,7 +105,7 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent { // Determine the task to run and create it. if (connectable.getConnectableType() == ConnectableType.PROCESSOR) { final ProcessorNode procNode = (ProcessorNode) connectable; - final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor, getStateManager(connectable.getIdentifier()), variableRegistry); + final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor, getStateManager(connectable.getIdentifier())); final ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, standardProcContext); http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index 61d9d29..f30a71e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -144,6 +144,16 @@ public class FlowFromDOMFactory { dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); dto.setComments(getString(element, "comment")); + final Map<String, String> variables = new HashMap<>(); + final NodeList variableList = DomUtils.getChildNodesByTagName(element, "variable"); + for (int i = 0; i < variableList.getLength(); i++) { + final Element variableElement = (Element) variableList.item(i); + final String name = variableElement.getAttribute("name"); + final String value = variableElement.getAttribute("value"); + variables.put(name, value); + } + dto.setVariables(variables); + final Set<ProcessorDTO> processors = new HashSet<>(); final Set<ConnectionDTO> connections = new HashSet<>(); final Set<FunnelDTO> funnels = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index 4a9a0f7..2a8df96 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -37,6 +37,8 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.persistence.TemplateSerializer; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.registry.VariableDescriptor; +import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.util.CharacterFilterUtils; @@ -70,7 +72,7 @@ import java.util.concurrent.TimeUnit; */ public class StandardFlowSerializer implements FlowSerializer { - private static final String MAX_ENCODING_VERSION = "1.1"; + private static final String MAX_ENCODING_VERSION = "1.2"; private final StringEncryptor encryptor; @@ -202,6 +204,18 @@ public class StandardFlowSerializer implements FlowSerializer { for (final Template template : group.getTemplates()) { addTemplate(element, template); } + + final VariableRegistry variableRegistry = group.getVariableRegistry(); + for (final Map.Entry<VariableDescriptor, String> entry : variableRegistry.getVariableMap().entrySet()) { + addVariable(element, entry.getKey().getName(), entry.getValue()); + } + } + + private static void addVariable(final Element parentElement, final String variableName, final String variableValue) { + final Element variableElement = parentElement.getOwnerDocument().createElement("variable"); + variableElement.setAttribute("name", variableName); + variableElement.setAttribute("value", variableValue); + parentElement.appendChild(variableElement); } private static void addBundle(final Element parentElement, final BundleCoordinate coordinate) { http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java new file mode 100644 index 0000000..148b847 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.service; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public class ServiceStateTransition { + private ControllerServiceState state = ControllerServiceState.DISABLED; + private final List<CompletableFuture<?>> enabledFutures = new ArrayList<>(); + private final List<CompletableFuture<?>> disabledFutures = new ArrayList<>(); + + + public synchronized boolean transitionToEnabling(final ControllerServiceState expectedState, final CompletableFuture<?> enabledFuture) { + if (expectedState != state) { + return false; + } + + state = ControllerServiceState.ENABLING; + enabledFutures.add(enabledFuture); + return true; + } + + public synchronized boolean enable() { + if (state != ControllerServiceState.ENABLING) { + return false; + } + + state = ControllerServiceState.ENABLED; + enabledFutures.stream().forEach(future -> future.complete(null)); + return true; + } + + public synchronized boolean transitionToDisabling(final ControllerServiceState expectedState, final CompletableFuture<?> disabledFuture) { + if (expectedState != state) { + return false; + } + + state = ControllerServiceState.DISABLING; + disabledFutures.add(disabledFuture); + return true; + } + + public synchronized void disable() { + state = ControllerServiceState.DISABLED; + disabledFutures.stream().forEach(future -> future.complete(null)); + } + + public synchronized ControllerServiceState getState() { + return state; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index fa4ab84..216a996 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -16,6 +16,24 @@ */ package org.apache.nifi.controller.service; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.documentation.DeprecationNotice; @@ -30,48 +48,30 @@ import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractConfiguredComponent; -import org.apache.nifi.controller.ReloadComponent; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.LoggableComponent; +import org.apache.nifi.controller.ReloadComponent; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.exception.ControllerServiceInstantiationException; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.SimpleProcessLogger; -import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.util.CharacterFilterUtils; import org.apache.nifi.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.InvocationTargetException; -import java.net.URL; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode { private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class); private final AtomicReference<ControllerServiceDetails> controllerServiceHolder = new AtomicReference<>(null); private final ControllerServiceProvider serviceProvider; - private final AtomicReference<ControllerServiceState> stateRef = new AtomicReference<>(ControllerServiceState.DISABLED); + private final ServiceStateTransition stateTransition = new ServiceStateTransition(); private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); @@ -85,7 +85,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i public StandardControllerServiceNode(final LoggableComponent<ControllerService> implementation, final LoggableComponent<ControllerService> proxiedControllerService, final ControllerServiceInvocationHandler invocationHandler, final String id, final ValidationContextFactory validationContextFactory, - final ControllerServiceProvider serviceProvider, final VariableRegistry variableRegistry, final ReloadComponent reloadComponent) { + final ControllerServiceProvider serviceProvider, final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent) { this(implementation, proxiedControllerService, invocationHandler, id, validationContextFactory, serviceProvider, implementation.getComponent().getClass().getSimpleName(), implementation.getComponent().getClass().getCanonicalName(), variableRegistry, reloadComponent, false); @@ -94,7 +94,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i public StandardControllerServiceNode(final LoggableComponent<ControllerService> implementation, final LoggableComponent<ControllerService> proxiedControllerService, final ControllerServiceInvocationHandler invocationHandler, final String id, final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, final String componentType, final String componentCanonicalClass, - final VariableRegistry variableRegistry, final ReloadComponent reloadComponent, final boolean isExtensionMissing) { + final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final boolean isExtensionMissing) { super(id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, isExtensionMissing); this.serviceProvider = serviceProvider; @@ -363,7 +363,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i @Override public ControllerServiceState getState() { - return stateRef.get(); + return stateTransition.getState(); } @Override @@ -394,7 +394,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i public CompletableFuture<Void> enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis) { final CompletableFuture<Void> future = new CompletableFuture<>(); - if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)) { + if (this.stateTransition.transitionToEnabling(ControllerServiceState.DISABLED, future)) { synchronized (active) { this.active.set(true); } @@ -410,17 +410,15 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i boolean shouldEnable = false; synchronized (active) { - shouldEnable = active.get() && stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED); + shouldEnable = active.get() && stateTransition.enable(); } - future.complete(null); - if (!shouldEnable) { LOG.debug("Disabling service " + this + " after it has been enabled due to disable action being initiated."); // Can only happen if user initiated DISABLE operation before service finished enabling. It's state will be // set to DISABLING (see disable() operation) invokeDisable(configContext); - stateRef.set(ControllerServiceState.DISABLED); + stateTransition.disable(); } } catch (Exception e) { future.completeExceptionally(e); @@ -437,7 +435,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getControllerServiceImplementation().getClass(), getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext); } - stateRef.set(ControllerServiceState.DISABLED); + stateTransition.disable(); } } } @@ -464,7 +462,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i * DISABLED state. */ @Override - public void disable(ScheduledExecutorService scheduler) { + public CompletableFuture<Void> disable(ScheduledExecutorService scheduler) { /* * The reason for synchronization is to ensure consistency of the * service state when another thread is in the middle of enabling this @@ -475,7 +473,8 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i this.active.set(false); } - if (this.stateRef.compareAndSet(ControllerServiceState.ENABLED, ControllerServiceState.DISABLING)) { + final CompletableFuture<Void> future = new CompletableFuture<>(); + if (this.stateTransition.transitionToDisabling(ControllerServiceState.ENABLED, future)) { final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, getVariableRegistry()); scheduler.execute(new Runnable() { @Override @@ -483,13 +482,15 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i try { invokeDisable(configContext); } finally { - stateRef.set(ControllerServiceState.DISABLED); + stateTransition.disable(); } } }); } else { - this.stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.DISABLING); + this.stateTransition.transitionToDisabling(ControllerServiceState.ENABLING, future); } + + return future; } /** @@ -515,7 +516,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i @Override public Collection<ValidationResult> getValidationErrors(Set<String> serviceIdentifiersNotToValidate) { Collection<ValidationResult> results = null; - if (stateRef.get() == ControllerServiceState.DISABLED) { + if (getState() == ControllerServiceState.DISABLED) { results = super.getValidationErrors(serviceIdentifiersNotToValidate); } return results != null ? results : Collections.emptySet(); http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 5c4e394..0745ed0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -30,6 +30,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; @@ -62,7 +63,9 @@ import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardValidationContextFactory; +import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.variable.StandardComponentVariableRegistry; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.NiFiProperties; @@ -148,8 +151,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi final LoggableComponent<ControllerService> originalLoggableComponent = new LoggableComponent<>(originalService, bundleCoordinate, serviceLogger); final LoggableComponent<ControllerService> proxiedLoggableComponent = new LoggableComponent<>(proxiedService, bundleCoordinate, serviceLogger); + final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry); final ControllerServiceNode serviceNode = new StandardControllerServiceNode(originalLoggableComponent, proxiedLoggableComponent, invocationHandler, - id, validationContextFactory, this, variableRegistry, flowController); + id, validationContextFactory, this, componentVarRegistry, flowController); serviceNode.setName(rawClass.getSimpleName()); invocationHandler.setServiceNode(serviceNode); @@ -226,8 +230,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi final LoggableComponent<ControllerService> proxiedLoggableComponent = new LoggableComponent<>(proxiedService, bundleCoordinate, null); + final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry); final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedLoggableComponent, proxiedLoggableComponent, invocationHandler, id, - new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, variableRegistry, flowController, true); + new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, componentVarRegistry, flowController, true); serviceCache.putIfAbsent(id, serviceNode); return serviceNode; @@ -235,9 +240,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi @Override public Set<ConfiguredComponent> disableReferencingServices(final ControllerServiceNode serviceNode) { - // Get a list of all Controller Services that need to be disabled, in the order that they need to be - // disabled. - final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class); + // Get a list of all Controller Services that need to be disabled, in the order that they need to be disabled. + final List<ControllerServiceNode> toDisable = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class); final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable); @@ -258,8 +262,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi public Set<ConfiguredComponent> scheduleReferencingComponents(final ControllerServiceNode serviceNode) { // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service, // or a service that references this controller service, etc. - final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class); - final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class); + final List<ProcessorNode> processors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class); + final List<ReportingTaskNode> reportingTasks = serviceNode.getReferences().findRecursiveReferences(ReportingTaskNode.class); final Set<ConfiguredComponent> updated = new HashSet<>(); @@ -298,8 +302,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi public Set<ConfiguredComponent> unscheduleReferencingComponents(final ControllerServiceNode serviceNode) { // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service, // or a service that references this controller service, etc. - final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class); - final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class); + final List<ProcessorNode> processors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class); + final List<ReportingTaskNode> reportingTasks = serviceNode.getReferences().findRecursiveReferences(ReportingTaskNode.class); final Set<ConfiguredComponent> updated = new HashSet<>(); @@ -333,7 +337,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } @Override - public Future<Void> enableControllerService(final ControllerServiceNode serviceNode) { + public CompletableFuture<Void> enableControllerService(final ControllerServiceNode serviceNode) { serviceNode.verifyCanEnable(); return processScheduler.enableControllerService(serviceNode); } @@ -450,9 +454,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } @Override - public void disableControllerService(final ControllerServiceNode serviceNode) { + public CompletableFuture<Void> disableControllerService(final ControllerServiceNode serviceNode) { serviceNode.verifyCanDisable(); - processScheduler.disableControllerService(serviceNode); + return processScheduler.disableControllerService(serviceNode); } @Override @@ -589,43 +593,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi return allServices; } - /** - * Returns a List of all components that reference the given referencedNode - * (either directly or indirectly through another service) that are also of - * the given componentType. The list that is returned is in the order in - * which they will need to be 'activated' (enabled/started). - * - * @param referencedNode node - * @param componentType type - * @return list of components - */ - private <T> List<T> findRecursiveReferences(final ControllerServiceNode referencedNode, final Class<T> componentType) { - final List<T> references = new ArrayList<>(); - - for (final ConfiguredComponent referencingComponent : referencedNode.getReferences().getReferencingComponents()) { - if (componentType.isAssignableFrom(referencingComponent.getClass())) { - references.add(componentType.cast(referencingComponent)); - } - - if (referencingComponent instanceof ControllerServiceNode) { - final ControllerServiceNode referencingNode = (ControllerServiceNode) referencingComponent; - - // find components recursively that depend on referencingNode. - final List<T> recursive = findRecursiveReferences(referencingNode, componentType); - - // For anything that depends on referencing node, we want to add it to the list, but we know - // that it must come after the referencing node, so we first remove any existing occurrence. - references.removeAll(recursive); - references.addAll(recursive); - } - } - - return references; - } @Override public Set<ConfiguredComponent> enableReferencingServices(final ControllerServiceNode serviceNode) { - final List<ControllerServiceNode> recursiveReferences = findRecursiveReferences(serviceNode, ControllerServiceNode.class); + final List<ControllerServiceNode> recursiveReferences = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class); logger.debug("Enabling the following Referencing Services for {}: {}", serviceNode, recursiveReferences); return enableReferencingServices(serviceNode, recursiveReferences); } @@ -658,7 +629,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi @Override public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) { - final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class); + final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class); final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices); for (final ControllerServiceNode referencingService : referencingServices) { @@ -668,9 +639,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi @Override public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) { - final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class); - final List<ReportingTaskNode> referencingReportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class); - final List<ProcessorNode> referencingProcessors = findRecursiveReferences(serviceNode, ProcessorNode.class); + final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class); + final List<ReportingTaskNode> referencingReportingTasks = serviceNode.getReferences().findRecursiveReferences(ReportingTaskNode.class); + final List<ProcessorNode> referencingProcessors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class); final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices); @@ -689,9 +660,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi @Override public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) { - // Get a list of all Controller Services that need to be disabled, in the order that they need to be - // disabled. - final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class); + // Get a list of all Controller Services that need to be disabled, in the order that they need to be disabled. + final List<ControllerServiceNode> toDisable = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class); final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable); for (final ControllerServiceNode nodeToDisable : toDisable) { http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java index d2f3833..285b8dc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java @@ -16,8 +16,10 @@ */ package org.apache.nifi.controller.service; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.nifi.controller.ConfiguredComponent; @@ -101,4 +103,35 @@ public class StandardControllerServiceReference implements ControllerServiceRefe return references; } + + + @Override + public <T> List<T> findRecursiveReferences(final Class<T> componentType) { + return findRecursiveReferences(referenced, componentType); + } + + private <T> List<T> findRecursiveReferences(final ControllerServiceNode referencedNode, final Class<T> componentType) { + final List<T> references = new ArrayList<>(); + + for (final ConfiguredComponent referencingComponent : referencedNode.getReferences().getReferencingComponents()) { + if (componentType.isAssignableFrom(referencingComponent.getClass())) { + references.add(componentType.cast(referencingComponent)); + } + + if (referencingComponent instanceof ControllerServiceNode) { + final ControllerServiceNode referencingNode = (ControllerServiceNode) referencingComponent; + + // find components recursively that depend on referencingNode. + final List<T> recursive = findRecursiveReferences(referencingNode, componentType); + + // For anything that depends on referencing node, we want to add it to the list, but we know + // that it must come after the referencing node, so we first remove any existing occurrence. + references.removeAll(recursive); + references.addAll(recursive); + } + } + + return references; + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index 1ef3e8b..3957955 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -16,6 +16,24 @@ */ package org.apache.nifi.fingerprint; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import javax.xml.XMLConstants; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.validation.Schema; +import javax.xml.validation.SchemaFactory; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ConfigurableComponent; @@ -38,23 +56,6 @@ import org.w3c.dom.Node; import org.w3c.dom.NodeList; import org.xml.sax.SAXException; -import javax.xml.XMLConstants; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.validation.Schema; -import javax.xml.validation.SchemaFactory; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; - /** * <p>Creates a fingerprint of a flow.xml. The order of elements or attributes in the flow.xml does not influence the fingerprint generation. * @@ -324,9 +325,22 @@ public class FingerprintFactory { addFunnelFingerprint(builder, funnelElem); } + // add variables + final NodeList variableElems = DomUtils.getChildNodesByTagName(processGroupElem, "variable"); + final List<Element> sortedVarList = sortElements(variableElems, getVariableNameComparator()); + for (final Element varElem : sortedVarList) { + addVariableFingerprint(builder, varElem); + } + return builder; } + private void addVariableFingerprint(final StringBuilder builder, final Element variableElement) { + final String variableName = variableElement.getAttribute("name"); + final String variableValue = variableElement.getAttribute("value"); + builder.append(variableName).append("=").append(variableValue); + } + private StringBuilder addFlowFileProcessorFingerprint(final StringBuilder builder, final Element processorElem) throws FingerprintException { // id appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "id")); @@ -662,6 +676,27 @@ public class FingerprintFactory { }; } + private Comparator<Element> getVariableNameComparator() { + return new Comparator<Element>() { + @Override + public int compare(final Element e1, final Element e2) { + if (e1 == null && e2 == null) { + return 0; + } + if (e1 == null) { + return 1; + } + if (e2 == null) { + return -1; + } + + final String varName1 = e1.getAttribute("name"); + final String varName2 = e2.getAttribute("name"); + return varName1.compareTo(varName2); + } + }; + } + private Comparator<Element> getProcessorPropertiesComparator() { return new Comparator<Element>() { @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 2907704..2b7b51d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -16,13 +16,31 @@ */ package org.apache.nifi.groups; -import com.google.common.collect.Sets; +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.attribute.expression.language.Query; +import org.apache.nifi.attribute.expression.language.VariableImpact; import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; @@ -39,6 +57,7 @@ import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Position; import org.apache.nifi.connectable.Positionable; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessorNode; @@ -50,13 +69,15 @@ import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.scheduling.StandardProcessScheduler; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.controller.service.ControllerServiceReference; import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.logging.LogRepositoryFactory; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.StandardProcessContext; -import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.VariableDescriptor; +import org.apache.nifi.registry.variable.MutableVariableRegistry; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.util.NiFiProperties; @@ -66,20 +87,7 @@ import org.apache.nifi.web.api.dto.TemplateDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static java.util.Objects.requireNonNull; +import com.google.common.collect.Sets; public final class StandardProcessGroup implements ProcessGroup { @@ -104,7 +112,7 @@ public final class StandardProcessGroup implements ProcessGroup { private final Map<String, ControllerServiceNode> controllerServices = new HashMap<>(); private final Map<String, Template> templates = new HashMap<>(); private final StringEncryptor encryptor; - private final VariableRegistry variableRegistry; + private final MutableVariableRegistry variableRegistry; private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); @@ -114,7 +122,7 @@ public final class StandardProcessGroup implements ProcessGroup { public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final StandardProcessScheduler scheduler, final NiFiProperties nifiProps, final StringEncryptor encryptor, final FlowController flowController, - final VariableRegistry variableRegistry) { + final MutableVariableRegistry variableRegistry) { this.id = id; this.controllerServiceProvider = serviceProvider; this.parent = new AtomicReference<>(); @@ -361,7 +369,7 @@ public final class StandardProcessGroup implements ProcessGroup { private void shutdown(final ProcessGroup procGroup) { for (final ProcessorNode node : procGroup.getProcessors()) { try (final NarCloseable x = NarCloseable.withComponentNarLoader(node.getProcessor().getClass(), node.getIdentifier())) { - final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()), variableRegistry); + final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier())); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor(), processContext); } } @@ -548,6 +556,8 @@ public final class StandardProcessGroup implements ProcessGroup { writeLock.lock(); try { group.setParent(this); + group.getVariableRegistry().setParent(getVariableRegistry()); + processGroups.put(Objects.requireNonNull(group).getIdentifier(), group); flowController.onProcessGroupAdded(group); } finally { @@ -709,6 +719,7 @@ public final class StandardProcessGroup implements ProcessGroup { } processor.setProcessGroup(this); + processor.getVariableRegistry().setParent(getVariableRegistry()); processors.put(processorId, processor); flowController.onProcessorAdded(processor); } finally { @@ -732,7 +743,7 @@ public final class StandardProcessGroup implements ProcessGroup { } try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getProcessor().getClass(), processor.getIdentifier())) { - final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()), variableRegistry); + final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier())); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext); } catch (final Exception e) { throw new ComponentLifeCycleException("Failed to invoke 'OnRemoved' methods of processor with id " + processor.getIdentifier(), e); @@ -1081,7 +1092,7 @@ public final class StandardProcessGroup implements ProcessGroup { } @Override - public void startProcessor(final ProcessorNode processor) { + public CompletableFuture<Void> startProcessor(final ProcessorNode processor) { readLock.lock(); try { if (getProcessor(processor.getIdentifier()) == null) { @@ -1092,10 +1103,10 @@ public final class StandardProcessGroup implements ProcessGroup { if (state == ScheduledState.DISABLED) { throw new IllegalStateException("Processor is disabled"); } else if (state == ScheduledState.RUNNING) { - return; + return CompletableFuture.completedFuture(null); } - scheduler.startProcessor(processor); + return scheduler.startProcessor(processor); } finally { readLock.unlock(); } @@ -1162,7 +1173,7 @@ public final class StandardProcessGroup implements ProcessGroup { } @Override - public void stopProcessor(final ProcessorNode processor) { + public CompletableFuture<Void> stopProcessor(final ProcessorNode processor) { readLock.lock(); try { if (!processors.containsKey(processor.getIdentifier())) { @@ -1173,10 +1184,10 @@ public final class StandardProcessGroup implements ProcessGroup { if (state == ScheduledState.DISABLED) { throw new IllegalStateException("Processor is disabled"); } else if (state == ScheduledState.STOPPED) { - return; + return CompletableFuture.completedFuture(null); } - scheduler.stopProcessor(processor); + return scheduler.stopProcessor(processor); } finally { readLock.unlock(); } @@ -1854,6 +1865,7 @@ public final class StandardProcessGroup implements ProcessGroup { } service.setProcessGroup(this); + service.getVariableRegistry().setParent(getVariableRegistry()); this.controllerServices.put(service.getIdentifier(), service); LOG.info("{} added to {}", service, this); } finally { @@ -2583,4 +2595,129 @@ public final class StandardProcessGroup implements ProcessGroup { readLock.unlock(); } } + + @Override + public MutableVariableRegistry getVariableRegistry() { + return variableRegistry; + } + + @Override + public void verifyCanUpdateVariables(final Map<String, String> updatedVariables) { + if (updatedVariables == null || updatedVariables.isEmpty()) { + return; + } + + readLock.lock(); + try { + final Set<String> updatedVariableNames = getUpdatedVariables(updatedVariables); + if (updatedVariableNames.isEmpty()) { + return; + } + + for (final ProcessorNode processor : findAllProcessors()) { + if (!processor.isRunning()) { + continue; + } + + for (final String variableName : updatedVariableNames) { + for (final VariableImpact impact : getVariableImpact(processor)) { + if (impact.isImpacted(variableName)) { + throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + processor + ", which is currently running"); + } + } + } + } + + for (final ControllerServiceNode service : findAllControllerServices()) { + if (!service.isActive()) { + continue; + } + + for (final String variableName : updatedVariableNames) { + for (final VariableImpact impact : getVariableImpact(service)) { + if (impact.isImpacted(variableName)) { + throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + service + ", which is currently running"); + } + } + } + } + } finally { + readLock.unlock(); + } + } + + @Override + public Set<ConfiguredComponent> getComponentsAffectedByVariable(final String variableName) { + final Set<ConfiguredComponent> affected = new HashSet<>(); + + // Determine any Processors that references the variable + for (final ProcessorNode processor : findAllProcessors()) { + for (final VariableImpact impact : getVariableImpact(processor)) { + if (impact.isImpacted(variableName)) { + affected.add(processor); + } + } + } + + // Determine any Controller Service that references the variable. If Service A references a variable, + // then that means that any other component that references that service is also affected, so recursively + // find any references to that service and add it. + for (final ControllerServiceNode service : findAllControllerServices()) { + for (final VariableImpact impact : getVariableImpact(service)) { + if (impact.isImpacted(variableName)) { + affected.add(service); + + final ControllerServiceReference reference = service.getReferences(); + affected.addAll(reference.findRecursiveReferences(ConfiguredComponent.class)); + } + } + } + + return affected; + } + + + private Set<String> getUpdatedVariables(final Map<String, String> newVariableValues) { + final Set<String> updatedVariableNames = new HashSet<>(); + + final MutableVariableRegistry registry = getVariableRegistry(); + for (final Map.Entry<String, String> entry : newVariableValues.entrySet()) { + final String varName = entry.getKey(); + final String newValue = entry.getValue(); + + final String curValue = registry.getVariableValue(varName); + if (!Objects.equals(newValue, curValue)) { + updatedVariableNames.add(varName); + } + } + + return updatedVariableNames; + } + + private List<VariableImpact> getVariableImpact(final ConfiguredComponent component) { + return component.getProperties().values().stream() + .map(propVal -> Query.prepare(propVal).getVariableImpact()) + .collect(Collectors.toList()); + } + + @Override + public void setVariables(final Map<String, String> variables) { + writeLock.lock(); + try { + verifyCanUpdateVariables(variables); + + if (variables == null) { + return; + } + + final Map<VariableDescriptor, String> variableMap = new HashMap<>(); + variables.entrySet().stream() // cannot use Collectors.toMap because value may be null + .forEach(entry -> variableMap.put(new VariableDescriptor(entry.getKey()), entry.getValue())); + + variableRegistry.setVariables(variableMap); + } finally { + writeLock.unlock(); + } + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java index 2714392..dfba330 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java @@ -26,8 +26,8 @@ import java.util.Set; import org.apache.nifi.attribute.expression.language.PreparedQuery; import org.apache.nifi.attribute.expression.language.Query; -import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.attribute.expression.language.Query.Range; +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.state.StateManager; @@ -37,7 +37,6 @@ import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.encrypt.StringEncryptor; -import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.Connectables; public class StandardProcessContext implements ProcessContext, ControllerServiceLookup { @@ -47,15 +46,12 @@ public class StandardProcessContext implements ProcessContext, ControllerService private final Map<PropertyDescriptor, PreparedQuery> preparedQueries; private final StringEncryptor encryptor; private final StateManager stateManager; - private final VariableRegistry variableRegistry; - public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, final StateManager stateManager, - final VariableRegistry variableRegistry) { + public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, final StateManager stateManager) { this.procNode = processorNode; this.controllerServiceProvider = controllerServiceProvider; this.encryptor = encryptor; this.stateManager = stateManager; - this.variableRegistry = variableRegistry; preparedQueries = new HashMap<>(); for (final Map.Entry<PropertyDescriptor, String> entry : procNode.getProperties().entrySet()) { @@ -93,12 +89,12 @@ public class StandardProcessContext implements ProcessContext, ControllerService final String setPropertyValue = procNode.getProperty(descriptor); final String propValue = (setPropertyValue == null) ? descriptor.getDefaultValue() : setPropertyValue; - return new StandardPropertyValue(propValue, this, preparedQueries.get(descriptor), variableRegistry); + return new StandardPropertyValue(propValue, this, preparedQueries.get(descriptor), procNode.getVariableRegistry()); } @Override public PropertyValue newPropertyValue(final String rawValue) { - return new StandardPropertyValue(rawValue, this, Query.prepare(rawValue), variableRegistry); + return new StandardPropertyValue(rawValue, this, Query.prepare(rawValue), procNode.getVariableRegistry()); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java index 662169c..2f38aee 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java @@ -53,8 +53,8 @@ public class StandardValidationContext implements ValidationContext { private final String componentId; public StandardValidationContext(final ControllerServiceProvider controllerServiceProvider, final Map<PropertyDescriptor, String> properties, - final String annotationData, final String groupId, final String componentId, VariableRegistry variableRegistry) { - this(controllerServiceProvider, Collections.<String> emptySet(), properties, annotationData, groupId, componentId,variableRegistry); + final String annotationData, final String groupId, final String componentId, final VariableRegistry variableRegistry) { + this(controllerServiceProvider, Collections.<String> emptySet(), properties, annotationData, groupId, componentId, variableRegistry); } public StandardValidationContext( @@ -63,7 +63,8 @@ public class StandardValidationContext implements ValidationContext { final Map<PropertyDescriptor, String> properties, final String annotationData, final String groupId, - final String componentId, VariableRegistry variableRegistry) { + final String componentId, + final VariableRegistry variableRegistry) { this.controllerServiceProvider = controllerServiceProvider; this.properties = new HashMap<>(properties); this.annotationData = annotationData;
