http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/variable/FileBasedVariableRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/variable/FileBasedVariableRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/variable/FileBasedVariableRegistry.java new file mode 100644 index 0000000..fb7a19d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/variable/FileBasedVariableRegistry.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.variable; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.nifi.registry.VariableDescriptor; +import org.apache.nifi.registry.VariableRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A file based variable registry that loads all properties from files specified + * during construction and is backed by system properties and environment + * variables accessible to the JVM. + */ +public class FileBasedVariableRegistry implements VariableRegistry { + + private final static Logger LOG = LoggerFactory.getLogger(FileBasedVariableRegistry.class); + final Map<VariableDescriptor, String> map; + + public FileBasedVariableRegistry(final Path[] propertiesPaths) { + final Map<VariableDescriptor, String> newMap = new HashMap<>(VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY.getVariableMap()); + final int systemEnvPropCount = newMap.size(); + int totalPropertiesLoaded = systemEnvPropCount; + LOG.info("Loaded {} properties from system properties and environment variables",systemEnvPropCount); + try { + for (final Path path : propertiesPaths) { + if (Files.exists(path)) { + final AtomicInteger propsLoaded = new AtomicInteger(0); + try (final InputStream inStream = new BufferedInputStream(new FileInputStream(path.toFile()))) { + Properties properties = new Properties(); + properties.load(inStream); + properties.entrySet().stream().forEach((entry) -> { + final VariableDescriptor desc = new VariableDescriptor.Builder(entry.getKey().toString()) + .description(path.toString()) + .sensitive(false) + .build(); + newMap.put(desc, entry.getValue().toString()); + propsLoaded.incrementAndGet(); + }); + } + totalPropertiesLoaded += propsLoaded.get(); + if(propsLoaded.get() > 0){ + LOG.info("Loaded {} properties from '{}'", propsLoaded.get(), path); + }else{ + LOG.warn("No properties loaded from '{}'", path); + } + } else { + LOG.warn("Skipping property file {} as it does not appear to exist", path); + } + } + } catch (final IOException ioe) { + LOG.error("Unable to complete variable registry loading from files due to ", ioe); + } + + LOG.info("Loaded a total of {} properties. Including precedence overrides effective accessible registry key size is {}", totalPropertiesLoaded, newMap.size()); + map = Collections.unmodifiableMap(newMap); + } + + @Override + public Map<VariableDescriptor, String> getVariableMap() { + return map; + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/variable/MutableVariableRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/variable/MutableVariableRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/variable/MutableVariableRegistry.java new file mode 100644 index 0000000..e37b402 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/variable/MutableVariableRegistry.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.variable; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.registry.VariableDescriptor; +import org.apache.nifi.registry.VariableRegistry; + +public class MutableVariableRegistry extends StandardComponentVariableRegistry implements VariableRegistry { + private volatile Map<VariableDescriptor, String> variableMap = new HashMap<>(); + + public MutableVariableRegistry(final VariableRegistry parent) { + super(parent); + } + + @Override + public Map<VariableDescriptor, String> getVariableMap() { + return variableMap; + } + + public void setVariables(final Map<VariableDescriptor, String> variables) { + final Map<VariableDescriptor, String> curVariableMap = this.variableMap; + final Map<VariableDescriptor, String> updatedVariableMap = new HashMap<>(curVariableMap); + for (final Map.Entry<VariableDescriptor, String> entry : variables.entrySet()) { + if (entry.getValue() == null) { + updatedVariableMap.remove(entry.getKey()); + } else { + updatedVariableMap.put(entry.getKey(), entry.getValue()); + } + } + + this.variableMap = Collections.unmodifiableMap(updatedVariableMap); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/variable/StandardComponentVariableRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/variable/StandardComponentVariableRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/variable/StandardComponentVariableRegistry.java new file mode 100644 index 0000000..b3c31f3 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/variable/StandardComponentVariableRegistry.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.variable; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +import org.apache.nifi.registry.ComponentVariableRegistry; +import org.apache.nifi.registry.VariableDescriptor; +import org.apache.nifi.registry.VariableRegistry; + +public class StandardComponentVariableRegistry implements ComponentVariableRegistry { + private volatile VariableRegistry parent; + + public StandardComponentVariableRegistry(final VariableRegistry parent) { + this.parent = Objects.requireNonNull(parent); + } + + @Override + public Map<VariableDescriptor, String> getVariableMap() { + return Collections.emptyMap(); + } + + @Override + public VariableRegistry getParent() { + return parent; + } + + @Override + public void setParent(final VariableRegistry parentRegistry) { + this.parent = parentRegistry; + } + + @Override + public VariableDescriptor getVariableKey(final String name) { + if (name == null) { + return null; + } + + final VariableDescriptor spec = new VariableDescriptor(name); + for (final Map.Entry<VariableDescriptor, String> entry : getVariableMap().entrySet()) { + if (entry.getKey().equals(spec)) { + return entry.getKey(); + } + } + + return null; + } + + @Override + public String getVariableValue(final String name) { + if (name == null) { + return null; + } + + final VariableDescriptor descriptor = new VariableDescriptor(name); + final String value = getVariableMap().get(descriptor); + if (value != null) { + return value; + } + + return parent.getVariableValue(descriptor); + } + + @Override + public String getVariableValue(final VariableDescriptor descriptor) { + if (descriptor == null) { + return null; + } + + final String value = getVariableMap().get(descriptor); + if (value != null) { + return value; + } + + return parent.getVariableValue(descriptor); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FileBasedVariableRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FileBasedVariableRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FileBasedVariableRegistry.java deleted file mode 100644 index c4079a7..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FileBasedVariableRegistry.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.nifi.registry.VariableDescriptor; -import org.apache.nifi.registry.VariableRegistry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A file based variable registry that loads all properties from files specified - * during construction and is backed by system properties and environment - * variables accessible to the JVM. - */ -public class FileBasedVariableRegistry implements VariableRegistry { - - private final static Logger LOG = LoggerFactory.getLogger(FileBasedVariableRegistry.class); - final Map<VariableDescriptor, String> map; - - public FileBasedVariableRegistry(final Path[] propertiesPaths) { - final Map<VariableDescriptor, String> newMap = new HashMap<>(VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY.getVariableMap()); - final int systemEnvPropCount = newMap.size(); - int totalPropertiesLoaded = systemEnvPropCount; - LOG.info("Loaded {} properties from system properties and environment variables",systemEnvPropCount); - try { - for (final Path path : propertiesPaths) { - if (Files.exists(path)) { - final AtomicInteger propsLoaded = new AtomicInteger(0); - try (final InputStream inStream = new BufferedInputStream(new FileInputStream(path.toFile()))) { - Properties properties = new Properties(); - properties.load(inStream); - properties.entrySet().stream().forEach((entry) -> { - final VariableDescriptor desc = new VariableDescriptor.Builder(entry.getKey().toString()) - .description(path.toString()) - .sensitive(false) - .build(); - newMap.put(desc, entry.getValue().toString()); - propsLoaded.incrementAndGet(); - }); - } - totalPropertiesLoaded += propsLoaded.get(); - if(propsLoaded.get() > 0){ - LOG.info("Loaded {} properties from '{}'", propsLoaded.get(), path); - }else{ - LOG.warn("No properties loaded from '{}'", path); - } - } else { - LOG.warn("Skipping property file {} as it does not appear to exist", path); - } - } - } catch (final IOException ioe) { - LOG.error("Unable to complete variable registry loading from files due to ", ioe); - } - LOG.info("Loaded a total of {} properties. Including precedence overrides effective accessible registry key size is {}", totalPropertiesLoaded, newMap.size()); - map = newMap; - } - - @Override - public Map<VariableDescriptor, String> getVariableMap() { - return Collections.unmodifiableMap(map); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index 30fff1f..247a790 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -147,9 +147,15 @@ <xs:element name="connection" type="ConnectionType" minOccurs="0" maxOccurs="unbounded" /> <xs:element name="controllerService" type="ControllerServiceType" minOccurs="0" maxOccurs="unbounded" /> <xs:element name="template" type="TemplateType" minOccurs="0" maxOccurs="unbounded" /> + <xs:element name="variable" type="VariableType" minOccurs="0" maxOccurs="unbounded" /> </xs:sequence> </xs:complexType> + <xs:complexType name="VariableType"> + <xs:attribute name="name" /> + <xs:attribute name="value" /> + </xs:complexType> + <!-- Same as ProcessGroupType except that instead of input ports & output ports being of type PortType, they are of type RootGroupPortType --> <xs:complexType name="RootProcessGroupType"> http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml index bf3fd2a..b3c426c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml @@ -23,7 +23,7 @@ </bean> <!-- variable registry --> - <bean id="variableRegistry" class="org.apache.nifi.util.FileBasedVariableRegistry"> + <bean id="variableRegistry" class="org.apache.nifi.registry.variable.FileBasedVariableRegistry"> <constructor-arg type="java.nio.file.Path[]" value="#{nifiProperties.getVariableRegistryPropertiesPaths()}" /> </bean> http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java index 1957cf4..6973d12 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java @@ -28,7 +28,7 @@ import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.events.VolatileBulletinRepository; import org.apache.nifi.registry.VariableRegistry; -import org.apache.nifi.util.FileBasedVariableRegistry; +import org.apache.nifi.registry.variable.FileBasedVariableRegistry; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java index 691af10..596c00f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java @@ -49,9 +49,9 @@ import org.apache.nifi.nar.SystemBundle; import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.MockProvenanceRepository; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.variable.FileBasedVariableRegistry; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.scheduling.SchedulingStrategy; -import org.apache.nifi.util.FileBasedVariableRegistry; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.api.dto.BundleDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO; http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java index 32b6f53..33c33c9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java @@ -17,6 +17,26 @@ package org.apache.nifi.controller; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; + import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; @@ -46,6 +66,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.registry.VariableDescriptor; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.variable.StandardComponentVariableRegistry; import org.apache.nifi.test.processors.ModifiesClasspathNoAnnotationProcessor; import org.apache.nifi.test.processors.ModifiesClasspathProcessor; import org.apache.nifi.util.MockPropertyValue; @@ -56,26 +77,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import java.io.File; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - public class TestStandardProcessorNode { private MockVariableRegistry variableRegistry; @@ -98,10 +99,10 @@ public class TestStandardProcessorNode { final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(processor, coordinate, null); final StandardProcessorNode procNode = new StandardProcessorNode(loggableComponent, uuid, createValidationContextFactory(), null, null, - NiFiProperties.createBasicNiFiProperties(null, null), VariableRegistry.EMPTY_REGISTRY, reloadComponent); + NiFiProperties.createBasicNiFiProperties(null, null), new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent); final ScheduledExecutorService taskScheduler = new FlowEngine(2, "TestClasspathResources", true); - final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null, null); + final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null); final SchedulingAgentCallback schedulingAgentCallback = new SchedulingAgentCallback() { @Override public void postMonitor() { @@ -414,7 +415,8 @@ public class TestStandardProcessorNode { processor.initialize(initContext); final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(processor, systemBundle.getBundleDetails().getCoordinate(), componentLog); - return new StandardProcessorNode(loggableComponent, uuid, validationContextFactory, processScheduler, null, niFiProperties, variableRegistry, reloadComponent); + return new StandardProcessorNode(loggableComponent, uuid, validationContextFactory, processScheduler, + null, niFiProperties, new StandardComponentVariableRegistry(variableRegistry), reloadComponent); } private static class MockReloadComponent implements ReloadComponent { http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java index 1903179..1dc74ab 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java @@ -16,15 +16,6 @@ */ package org.apache.nifi.controller.reporting; -import static org.junit.Assert.assertEquals; - -import java.io.File; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; - import org.apache.commons.io.FileUtils; import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer; @@ -45,14 +36,23 @@ import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.SystemBundle; import org.apache.nifi.provenance.MockProvenanceRepository; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.variable.FileBasedVariableRegistry; import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.util.FileBasedVariableRegistry; import org.apache.nifi.util.NiFiProperties; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + public class TestStandardReportingContext { private FlowController controller; http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index 7f36a65..c544ef4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -46,7 +46,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.provenance.MockProvenanceRepository; -import org.apache.nifi.util.FileBasedVariableRegistry; +import org.apache.nifi.registry.variable.FileBasedVariableRegistry; import org.apache.nifi.util.NiFiProperties; import org.junit.After; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index dfd627f..2c59964 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -74,6 +74,7 @@ import org.apache.nifi.processor.StandardProcessorInitializationContext; import org.apache.nifi.processor.StandardValidationContextFactory; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.variable.StandardComponentVariableRegistry; import org.apache.nifi.reporting.AbstractReportingTask; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingContext; @@ -110,7 +111,7 @@ public class TestStandardProcessScheduler { systemBundle = SystemBundle.create(nifiProperties); ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet()); - scheduler = new StandardProcessScheduler(Mockito.mock(ControllerServiceProvider.class), null, stateMgrProvider, variableRegistry, nifiProperties); + scheduler = new StandardProcessScheduler(Mockito.mock(ControllerServiceProvider.class), null, stateMgrProvider, nifiProperties); scheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, Mockito.mock(SchedulingAgent.class)); reportingTask = new TestReportingTask(); @@ -122,7 +123,8 @@ public class TestStandardProcessScheduler { final ComponentLog logger = Mockito.mock(ComponentLog.class); final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class); final LoggableComponent<ReportingTask> loggableComponent = new LoggableComponent<>(reportingTask, systemBundle.getBundleDetails().getCoordinate(), logger); - taskNode = new StandardReportingTaskNode(loggableComponent, UUID.randomUUID().toString(), null, scheduler, validationContextFactory, variableRegistry, reloadComponent); + taskNode = new StandardReportingTaskNode(loggableComponent, UUID.randomUUID().toString(), null, scheduler, validationContextFactory, + new StandardComponentVariableRegistry(variableRegistry), reloadComponent); controller = Mockito.mock(FlowController.class); @@ -196,7 +198,7 @@ public class TestStandardProcessScheduler { final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), null); final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, uuid, new StandardValidationContextFactory(serviceProvider, variableRegistry), - scheduler, serviceProvider, nifiProperties, VariableRegistry.EMPTY_REGISTRY, reloadComponent); + scheduler, serviceProvider, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent); rootGroup.addProcessor(procNode); Map<String, String> procProps = new HashMap<>(); @@ -580,6 +582,6 @@ public class TestStandardProcessScheduler { } private ProcessScheduler createScheduler() { - return new StandardProcessScheduler(null, null, stateMgrProvider, variableRegistry, nifiProperties); + return new StandardProcessScheduler(null, null, stateMgrProvider, nifiProperties); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java index c235a57..7a49103 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java @@ -29,8 +29,8 @@ import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.SystemBundle; import org.apache.nifi.provenance.MockProvenanceRepository; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.variable.FileBasedVariableRegistry; import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.util.FileBasedVariableRegistry; import org.apache.nifi.util.NiFiProperties; import org.junit.Before; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java index 4a97b8a..55263f8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java @@ -24,8 +24,8 @@ import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarClassLoaders; import org.apache.nifi.nar.SystemBundle; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.variable.FileBasedVariableRegistry; import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.util.FileBasedVariableRegistry; import org.apache.nifi.util.NiFiProperties; import org.junit.Before; import org.junit.BeforeClass; http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index e82085e..0d15143 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -17,6 +17,22 @@ */ package org.apache.nifi.controller.service; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.beans.PropertyDescriptor; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + import org.apache.nifi.bundle.Bundle; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManagerProvider; @@ -40,6 +56,8 @@ import org.apache.nifi.nar.SystemBundle; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.StandardValidationContextFactory; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.variable.MutableVariableRegistry; +import org.apache.nifi.registry.variable.StandardComponentVariableRegistry; import org.apache.nifi.util.NiFiProperties; import org.junit.Assert; import org.junit.Before; @@ -49,22 +67,6 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import java.beans.PropertyDescriptor; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - public class TestStandardControllerServiceProvider { private static StateManagerProvider stateManagerProvider = new StateManagerProvider() { @@ -129,7 +131,7 @@ public class TestStandardControllerServiceProvider { } private StandardProcessScheduler createScheduler() { - return new StandardProcessScheduler(null, null, stateManagerProvider, variableRegistry, niFiProperties); + return new StandardProcessScheduler(null, null, stateManagerProvider, niFiProperties); } private void setProperty(ControllerServiceNode serviceNode, String propName, String propValue) { @@ -432,9 +434,10 @@ public class TestStandardControllerServiceProvider { final LoggableComponent<Processor> dummyProcessor = new LoggableComponent<>(new DummyProcessor(), systemBundle.getBundleDetails().getCoordinate(), null); final ProcessorNode procNode = new StandardProcessorNode(dummyProcessor, UUID.randomUUID().toString(), new StandardValidationContextFactory(serviceProvider, null), scheduler, serviceProvider, niFiProperties, - VariableRegistry.EMPTY_REGISTRY, reloadComponent); + new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent); - final ProcessGroup group = new StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, null, null, Mockito.mock(FlowController.class), variableRegistry); + final ProcessGroup group = new StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, null, null, Mockito.mock(FlowController.class), + new MutableVariableRegistry(variableRegistry)); group.addProcessor(procNode); procNode.setProcessGroup(group); http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index b6f70a1..9725ed8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -18,11 +18,13 @@ package org.apache.nifi.controller.service.mock; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; @@ -32,6 +34,7 @@ import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Position; import org.apache.nifi.connectable.Positionable; +import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.Snippet; @@ -41,12 +44,15 @@ import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.variable.MutableVariableRegistry; import org.apache.nifi.remote.RemoteGroupPort; public class MockProcessGroup implements ProcessGroup { private final Map<String, ControllerServiceNode> serviceMap = new HashMap<>(); private final Map<String, ProcessorNode> processorMap = new HashMap<>(); private final FlowController flowController; + private final MutableVariableRegistry variableRegistry = new MutableVariableRegistry(VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY); public MockProcessGroup(final FlowController flowController) { this.flowController = flowController; @@ -143,8 +149,8 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void startProcessor(final ProcessorNode processor) { - + public CompletableFuture<Void> startProcessor(final ProcessorNode processor) { + return CompletableFuture.completedFuture(null); } @Override @@ -163,8 +169,8 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void stopProcessor(final ProcessorNode processor) { - + public CompletableFuture<Void> stopProcessor(final ProcessorNode processor) { + return CompletableFuture.completedFuture(null); } @Override @@ -601,4 +607,22 @@ public class MockProcessGroup implements ProcessGroup { @Override public void verifyCanStop(final Connectable connectable) { } + + @Override + public MutableVariableRegistry getVariableRegistry() { + return variableRegistry; + } + + @Override + public void verifyCanUpdateVariables(Map<String, String> updatedVariables) { + } + + @Override + public void setVariables(Map<String, String> variables) { + } + + @Override + public Set<ConfiguredComponent> getComponentsAffectedByVariable(String variableName) { + return Collections.emptySet(); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/util/TestFileBasedVariableRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/util/TestFileBasedVariableRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/util/TestFileBasedVariableRegistry.java index cf44435..daed0e0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/util/TestFileBasedVariableRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/util/TestFileBasedVariableRegistry.java @@ -21,6 +21,7 @@ import java.nio.file.Paths; import java.util.Map; import org.apache.nifi.registry.VariableDescriptor; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.variable.FileBasedVariableRegistry; import org.junit.Test; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java index 391f9e2..22ad122 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java @@ -26,6 +26,7 @@ import org.apache.nifi.action.details.FlowChangeMoveDetails; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.dao.ProcessGroupDAO; @@ -175,6 +176,60 @@ public class ProcessGroupAuditor extends NiFiAuditor { + "execution(void scheduleComponents(java.lang.String, org.apache.nifi.controller.ScheduledState, java.util.Set)) && " + "args(groupId, state)") public void scheduleComponentsAdvice(ProceedingJoinPoint proceedingJoinPoint, String groupId, ScheduledState state) throws Throwable { + final Operation operation; + // determine the running state + if (ScheduledState.RUNNING.equals(state)) { + operation = Operation.Start; + } else { + operation = Operation.Stop; + } + + saveUpdateAction(proceedingJoinPoint, groupId, operation); + } + + + /** + * Audits the update of controller serivce state + * + * @param proceedingJoinPoint join point + * @param groupId group id + * @param state controller serivce state state + * @throws Throwable ex + */ + @Around("within(org.apache.nifi.web.dao.ProcessGroupDAO+) && " + + "execution(java.util.concurrent.Future activateControllerServices(java.lang.String, org.apache.nifi.controller.service.ControllerServiceState, java.util.Set)) && " + + "args(groupId, state)") + public void activateControllerServicesAdvice(ProceedingJoinPoint proceedingJoinPoint, String groupId, ControllerServiceState state) throws Throwable { + + // determine the service state + final Operation operation; + if (ControllerServiceState.ENABLED.equals(state)) { + operation = Operation.Enable; + } else { + operation = Operation.Disable; + } + + saveUpdateAction(proceedingJoinPoint, groupId, operation); + } + + /** + * Audits the update of process group variable registry. + * + * @param proceedingJoinPoint join point + * @param groupId group id + * @throws Throwable ex + */ + @Around("within(org.apache.nifi.web.dao.ProcessGroupDAO+) && " + + "execution(org.apache.nifi.groups.ProcessGroup updateVariableRegistry(org.apache.nifi.web.api.dto.VariableRegistryDTO)) && " + + "args(groupId)") + public void updateVariableRegistryAdvice(ProceedingJoinPoint proceedingJoinPoint, String groupId) throws Throwable { + final Operation operation = Operation.Configure; + saveUpdateAction(proceedingJoinPoint, groupId, operation); + } + + + + private void saveUpdateAction(final ProceedingJoinPoint proceedingJoinPoint, final String groupId, final Operation operation) throws Throwable { ProcessGroupDAO processGroupDAO = getProcessGroupDAO(); ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); @@ -191,13 +246,7 @@ public class ProcessGroupAuditor extends NiFiAuditor { action.setSourceName(processGroup.getName()); action.setSourceType(Component.ProcessGroup); action.setTimestamp(new Date()); - - // determine the running state - if (ScheduledState.RUNNING.equals(state)) { - action.setOperation(Operation.Start); - } else { - action.setOperation(Operation.Stop); - } + action.setOperation(operation); // add this action saveAction(action, logger); http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/variable/VariableRegistryUpdateRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/variable/VariableRegistryUpdateRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/variable/VariableRegistryUpdateRequest.java new file mode 100644 index 0000000..82d4683 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/variable/VariableRegistryUpdateRequest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.variable; + +import java.util.Date; +import java.util.concurrent.atomic.AtomicReference; + +public class VariableRegistryUpdateRequest { + private final String requestId; + private final String processGroupId; + private volatile Date submissionTime = new Date(); + private volatile Date lastUpdated = new Date(); + private volatile boolean complete = false; + + private final AtomicReference<String> failureReason = new AtomicReference<>(); + + private final VariableRegistryUpdateStep identifyComponentsStep = new VariableRegistryUpdateStep("Identifying components affected"); + private final VariableRegistryUpdateStep stopProcessors = new VariableRegistryUpdateStep("Stopping affected Processors"); + private final VariableRegistryUpdateStep disableServices = new VariableRegistryUpdateStep("Disabling affected Controller Services"); + private final VariableRegistryUpdateStep applyUpdates = new VariableRegistryUpdateStep("Applying Updates"); + private final VariableRegistryUpdateStep enableServices = new VariableRegistryUpdateStep("Re-Enabling affected Controller Services"); + private final VariableRegistryUpdateStep startProcessors = new VariableRegistryUpdateStep("Restarting affected Processors"); + + public VariableRegistryUpdateRequest(final String requestId, final String processGroupId) { + this.requestId = requestId; + this.processGroupId = processGroupId; + } + + public String getProcessGroupId() { + return processGroupId; + } + + + public String getRequestId() { + return requestId; + } + + public Date getSubmissionTime() { + return submissionTime; + } + + public Date getLastUpdated() { + return lastUpdated; + } + + public void setLastUpdated(Date lastUpdated) { + this.lastUpdated = lastUpdated; + } + + public VariableRegistryUpdateStep getIdentifyRelevantComponentsStep() { + return identifyComponentsStep; + } + + public VariableRegistryUpdateStep getStopProcessorsStep() { + return stopProcessors; + } + + public VariableRegistryUpdateStep getDisableServicesStep() { + return disableServices; + } + + public VariableRegistryUpdateStep getApplyUpdatesStep() { + return applyUpdates; + } + + public VariableRegistryUpdateStep getEnableServicesStep() { + return enableServices; + } + + public VariableRegistryUpdateStep getStartProcessorsStep() { + return startProcessors; + } + + public boolean isComplete() { + return complete; + } + + public void setComplete(boolean complete) { + this.complete = complete; + } + + public String getFailureReason() { + return failureReason.get(); + } + + public void setFailureReason(String reason) { + this.failureReason.set(reason); + } + + public void cancel() { + this.failureReason.compareAndSet(null, "Update was cancelled"); + this.complete = true; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/variable/VariableRegistryUpdateStep.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/variable/VariableRegistryUpdateStep.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/variable/VariableRegistryUpdateStep.java new file mode 100644 index 0000000..6013795 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/variable/VariableRegistryUpdateStep.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.variable; + +public class VariableRegistryUpdateStep { + private final String description; + private volatile boolean complete; + private volatile String failureReason; + + public VariableRegistryUpdateStep(final String description) { + this.description = description; + } + + public boolean isComplete() { + return complete; + } + + public void setComplete(boolean complete) { + this.complete = complete; + } + + public String getFailureReason() { + return failureReason; + } + + public void setFailureReason(String failureReason) { + this.failureReason = failureReason; + } + + public String getDescription() { + return description; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 6f9ea98..6fed58e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -16,6 +16,13 @@ */ package org.apache.nifi.web; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + import org.apache.nifi.authorization.AuthorizeAccess; import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.user.NiFiUser; @@ -24,6 +31,7 @@ import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.web.api.dto.AccessPolicyDTO; +import org.apache.nifi.web.api.dto.AffectedComponentDTO; import org.apache.nifi.web.api.dto.BulletinBoardDTO; import org.apache.nifi.web.api.dto.BulletinDTO; import org.apache.nifi.web.api.dto.BulletinQueryDTO; @@ -57,6 +65,7 @@ import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO; import org.apache.nifi.web.api.dto.TemplateDTO; import org.apache.nifi.web.api.dto.UserDTO; import org.apache.nifi.web.api.dto.UserGroupDTO; +import org.apache.nifi.web.api.dto.VariableRegistryDTO; import org.apache.nifi.web.api.dto.action.HistoryDTO; import org.apache.nifi.web.api.dto.action.HistoryQueryDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; @@ -67,6 +76,7 @@ import org.apache.nifi.web.api.dto.search.SearchResultsDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; import org.apache.nifi.web.api.entity.AccessPolicyEntity; import org.apache.nifi.web.api.entity.ActionEntity; +import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; import org.apache.nifi.web.api.entity.BulletinEntity; import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.ConnectionStatusEntity; @@ -96,13 +106,7 @@ import org.apache.nifi.web.api.entity.StatusHistoryEntity; import org.apache.nifi.web.api.entity.TemplateEntity; import org.apache.nifi.web.api.entity.UserEntity; import org.apache.nifi.web.api.entity.UserGroupEntity; - -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.function.Function; +import org.apache.nifi.web.api.entity.VariableRegistryEntity; /** * Defines the NiFiServiceFacade interface. @@ -887,6 +891,48 @@ public interface NiFiServiceFacade { ProcessGroupEntity getProcessGroup(String groupId); /** + * Returns the Variable Registry for the Process Group with the given ID + * + * @param groupId the ID of the Process Group + * @param includeAncestorGroups whether or not to include the variables that are defined in the the process group's parent group & its parent group, etc. + * @return the Variable Registry transfer object + */ + VariableRegistryEntity getVariableRegistry(String groupId, boolean includeAncestorGroups); + + /** + * Returns a Variable Registry that includes the variables in the given DTO but has the affected components populated + * + * @param variableRegistryDto the Variable Registry that contains the variables of interest + * @return a Variable Registry that has the affected components populated + */ + VariableRegistryEntity populateAffectedComponents(VariableRegistryDTO variableRegistryDto); + + /** + * Updates the variable registry on behalf of the user currently logged in + * + * @param revision Revision to compare with current base revision + * @param variableRegistryDto the Variable Registry + */ + VariableRegistryEntity updateVariableRegistry(Revision revision, VariableRegistryDTO variableRegistryDto); + + /** + * Updates the variable registry on behalf of the given user + * + * @param user the user who performed the action + * @param revision Revision to compare with current base revision + * @param variableRegistryDto the Variable Registry + */ + VariableRegistryEntity updateVariableRegistry(NiFiUser user, Revision revision, VariableRegistryDTO variableRegistryDto); + + /** + * Determines which components will be affected by updating the given Variable Registry + * + * @param variableRegistryDto the variable registry + * @return the components that will be affected + */ + Set<AffectedComponentDTO> getComponentsAffectedByVariableRegistryUpdate(VariableRegistryDTO variableRegistryDto); + + /** * Gets all process groups in the specified parent group. * * @param parentGroupId The id of the parent group @@ -904,7 +950,37 @@ public interface NiFiServiceFacade { void verifyScheduleComponents(String processGroupId, ScheduledState state, Set<String> componentIds); /** - * Schedules all applicable components under the specified ProcessGroup. + * Verifies the controller services with the given ID's can be enabled or disabled + * + * @param processGroupId the ID of the process group + * @param state the state + * @param serviceIds the id's of the services + */ + void verifyActivateControllerServices(String processGroupId, ControllerServiceState state, Set<String> serviceIds); + + /** + * Enables or disables the controller services with the given IDs & Revisions on behalf of the currently logged in user + * + * @param processGroupId the ID of the process group + * @param state the desired state of the services + * @param serviceRevisions a mapping of Controller Service ID to current Revision + * @return snapshot + */ + ActivateControllerServicesEntity activateControllerServices(String processGroupId, ControllerServiceState state, Map<String, Revision> serviceRevisions); + + /** + * Enables or disables the controller services with the given IDs & Revisions on behalf of the given user + * + * @param user the user performing the action + * @param processGroupId the ID of the process group + * @param state the desired state of the services + * @param serviceRevisions a mapping of Controller Service ID to current Revision + * @return snapshot + */ + ActivateControllerServicesEntity activateControllerServices(NiFiUser user, String processGroupId, ControllerServiceState state, Map<String, Revision> serviceRevisions); + + /** + * Schedules all applicable components under the specified ProcessGroup on behalf of the currently logged in user. * * @param processGroupId The ProcessGroup id * @param state schedule state @@ -914,6 +990,17 @@ public interface NiFiServiceFacade { ScheduleComponentsEntity scheduleComponents(String processGroupId, ScheduledState state, Map<String, Revision> componentRevisions); /** + * Schedules all applicable components under the specified ProcessGroup on behalf of the given user. + * + * @param user the user performing the action + * @param processGroupId The ProcessGroup id + * @param state schedule state + * @param componentRevisions components and their revision + * @return snapshot + */ + ScheduleComponentsEntity scheduleComponents(NiFiUser user, String processGroupId, ScheduledState state, Map<String, Revision> componentRevisions); + + /** * Updates the specified process group. * * @param revision Revision to compare with current base revision @@ -923,6 +1010,14 @@ public interface NiFiServiceFacade { ProcessGroupEntity updateProcessGroup(Revision revision, ProcessGroupDTO processGroupDTO); /** + * Verifies that the Process Group identified by the given DTO can be updated in the manner appropriate according + * to the DTO + * + * @param processGroupDTO the DTO that indicates the updates to occur + */ + void verifyUpdateProcessGroup(ProcessGroupDTO processGroupDTO); + + /** * Verifies the specified process group can be removed. * * @param groupId The id of the process group @@ -1378,9 +1473,22 @@ public interface NiFiServiceFacade { * Gets all controller services that belong to the given group and its parent/ancestor groups * * @param groupId the id of the process group of interest + * @param includeAncestorGroups if true, parent and ancestor groups' services will be returned as well + * @param includeDescendantGroups if true, child and descendant groups' services will be returned as well + * @return services + */ + Set<ControllerServiceEntity> getControllerServices(String groupId, boolean includeAncestorGroups, boolean includeDescendantGroups); + + /** + * Gets all controller services that belong to the given group and its parent/ancestor groups + * + * @param groupId the id of the process group of interest + * @param includeAncestorGroups if true, parent and ancestor groups' services will be returned as well + * @param includeDescendantGroups if true, child and descendant groups' services will be returned as well + * @param user the user that is retrieving the Controller Services * @return services */ - Set<ControllerServiceEntity> getControllerServices(String groupId); + Set<ControllerServiceEntity> getControllerServices(String groupId, boolean includeAncestorGroups, boolean includeDescendantGroups, NiFiUser user); /** * Gets the specified controller service. http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java index 11573a5..d0230db 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java @@ -103,6 +103,18 @@ public class NiFiServiceFacadeLock { } @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " + + "execution(* activate*(..))") + public Object activateLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { + return proceedWithWriteLock(proceedingJoinPoint); + } + + @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " + + "execution(* populate*(..))") + public Object populateLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { + return proceedWithWriteLock(proceedingJoinPoint); + } + + @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " + "execution(* get*(..))") public Object getLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { return proceedWithReadLock(proceedingJoinPoint);
