This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 1040788 NIFI-7012: Refactored OnConfigurationRestored to support
sensitive property validation (#5415)
1040788 is described below
commit 104078868ed2e90eca805b9bddc6e7f02c29851c
Author: Matthew Burgess <[email protected]>
AuthorDate: Wed Oct 27 14:40:17 2021 -0400
NIFI-7012: Refactored OnConfigurationRestored to support sensitive property
validation (#5415)
---
.../lifecycle/OnConfigurationRestored.java | 9 ++-
.../nifi/util/StandardProcessorTestRunner.java | 6 +-
.../nifi/controller/StandardProcessorNode.java | 9 +++
.../apache/nifi/groups/StandardProcessGroup.java | 4 ++
.../nifi/controller/AbstractComponentNode.java | 74 ++++++++++++++++------
.../org/apache/nifi/controller/ProcessorNode.java | 7 ++
.../org/apache/nifi/controller/FlowController.java | 11 +++-
.../org/apache/nifi/controller/FlowSnippet.java | 3 +-
.../nifi/controller/StandardFlowSnippet.java | 14 ++--
.../nifi/controller/StandardReloadComponent.java | 5 ++
.../nifi/controller/flow/StandardFlowManager.java | 14 ++--
.../nifi/web/dao/impl/StandardProcessorDAO.java | 5 ++
.../processors/script/InvokeScriptedProcessor.java | 24 +++++++
.../script/AbstractScriptedControllerService.java | 8 +++
.../nifi/processors/script/TestInvokeGroovy.java | 26 +++++++-
.../nifi/processors/script/TestInvokeJython.java | 1 -
.../stateless/engine/StatelessFlowManager.java | 15 ++++-
.../stateless/engine/StatelessReloadComponent.java | 5 ++
18 files changed, 194 insertions(+), 46 deletions(-)
diff --git
a/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnConfigurationRestored.java
b/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnConfigurationRestored.java
index eaa8966..72c5bd3 100644
---
a/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnConfigurationRestored.java
+++
b/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnConfigurationRestored.java
@@ -17,6 +17,9 @@
package org.apache.nifi.annotation.lifecycle;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.ProcessContext;
+
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
@@ -32,7 +35,11 @@ import java.lang.annotation.Target;
* </p>
*
* <p>
- * Methods with this annotation must take zero arguments.
+ * Methods with this annotation are permitted to take no arguments or to take a
+ * single argument. If using a single argument, that argument must be of type
+ * {@link ConfigurationContext} if the component is a ReportingTask or a
+ * ControllerService. If the component is a Processor, then the argument must
be
+ * of type {@link ProcessContext}.
* </p>
*
* <p>
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index d489177..7de3bbe 100644
---
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -135,8 +135,6 @@ public class StandardProcessorTestRunner implements
TestRunner {
}
triggerSerially = null !=
processor.getClass().getAnnotation(TriggerSerially.class);
-
-
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
processor);
}
@Override
@@ -195,6 +193,10 @@ public class StandardProcessorTestRunner implements
TestRunner {
context.assertValid();
context.enableExpressionValidation();
+
+ // Call onConfigurationRestored here, right before the test run, as
all properties should have been set byt this point.
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
processor, this.context);
+
try {
if (initialize) {
try {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 875cbbd..a23fd0e 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -26,6 +26,7 @@ import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
+import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@@ -1912,4 +1913,12 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
}
}
}
+
+ @Override
+ public void onConfigurationRestored(final ProcessContext context) {
+ try (final NarCloseable nc =
NarCloseable.withComponentNarLoader(getExtensionManager(),
getProcessor().getClass(), getProcessor().getIdentifier())) {
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
getProcessor(), context);
+ }
+ }
+
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 880c6d2..7b32364 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -4989,6 +4989,10 @@ public final class StandardProcessGroup implements
ProcessGroup {
destination.addProcessor(procNode);
updateProcessor(procNode, proposed);
+ // Notify the processor node that the configuration (properties, e.g.)
has been restored
+ final StandardProcessContext processContext = new
StandardProcessContext(procNode, controllerServiceProvider, encryptor,
+
stateManagerProvider.getStateManager(procNode.getProcessor().getIdentifier()),
() -> false, nodeTypeProvider);
+ procNode.onConfigurationRestored(processContext);
return procNode;
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index 7c5165f..f4977cc 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -80,6 +80,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -330,17 +331,6 @@ public abstract class AbstractComponentNode implements
ComponentNode {
}
}
}
- } else {
- final ParameterContext parameterContext =
getParameterContext();
- if (parameterContext != null) {
- for (final ParameterReference reference : referenceList) {
- final Optional<Parameter> parameter =
parameterContext.getParameter(reference.getParameterName());
- if (parameter.isPresent() &&
parameter.get().getDescriptor().isSensitive()) {
- throw new IllegalArgumentException("The property
'" + descriptor.getDisplayName() + "' cannot reference Parameter '" +
parameter.get().getDescriptor().getName()
- + "' because Sensitive Parameters may only be
referenced by Sensitive Properties.");
- }
- }
- }
}
if (descriptor.getControllerServiceDefinition() != null) {
@@ -550,15 +540,15 @@ public abstract class AbstractComponentNode implements
ComponentNode {
@Override
public Map<PropertyDescriptor, String> getRawPropertyValues() {
- return getPropertyValues(PropertyConfiguration::getRawValue);
+ return getPropertyValues((descriptor, config) -> config.getRawValue());
}
@Override
public Map<PropertyDescriptor, String> getEffectivePropertyValues() {
- return getPropertyValues(config ->
config.getEffectiveValue(getParameterContext()));
+ return getPropertyValues((descriptor, config) ->
getConfigValue(config, isResolveParameter(descriptor, config)));
}
- private Map<PropertyDescriptor, String> getPropertyValues(final
Function<PropertyConfiguration, String> valueFunction) {
+ private Map<PropertyDescriptor, String> getPropertyValues(final
BiFunction<PropertyDescriptor, PropertyConfiguration, String> valueFunction) {
try (final NarCloseable narCloseable =
NarCloseable.withComponentNarLoader(extensionManager,
getComponent().getClass(), getIdentifier())) {
final List<PropertyDescriptor> supported =
getComponent().getPropertyDescriptors();
@@ -569,7 +559,7 @@ public abstract class AbstractComponentNode implements
ComponentNode {
}
}
- properties.forEach((descriptor, config) -> props.put(descriptor,
valueFunction.apply(config)));
+ properties.forEach((descriptor, config) -> props.put(descriptor,
valueFunction.apply(descriptor, config)));
return props;
}
}
@@ -777,10 +767,22 @@ public abstract class AbstractComponentNode implements
ComponentNode {
for (final String paramName : referencedParameters) {
if (!validationContext.isParameterDefined(paramName)) {
results.add(new ValidationResult.Builder()
- .subject(propertyDescriptor.getDisplayName())
- .valid(false)
- .explanation("Property references Parameter '" +
paramName + "' but the currently selected Parameter Context does not have a
Parameter with that name")
- .build());
+ .subject(propertyDescriptor.getDisplayName())
+ .valid(false)
+ .explanation("Property references Parameter '" +
paramName + "' but the currently selected Parameter Context does not have a
Parameter with that name")
+ .build());
+ }
+ final Optional<Parameter> parameterRef =
parameterContext.getParameter(paramName);
+ if (parameterRef.isPresent()) {
+ final ParameterDescriptor parameterDescriptor =
parameterRef.get().getDescriptor();
+ if (parameterDescriptor.isSensitive() !=
propertyDescriptor.isSensitive()) {
+ results.add(new ValidationResult.Builder()
+ .subject(propertyDescriptor.getDisplayName())
+ .valid(false)
+ .explanation("The property '" +
propertyDescriptor.getDisplayName() + "' cannot reference Parameter '" +
parameterDescriptor.getName()
+ + "' because the Sensitivity of the
parameter does not match the Sensitivity of the property.")
+ .build());
+ }
}
}
}
@@ -1243,6 +1245,40 @@ public abstract class AbstractComponentNode implements
ComponentNode {
this.additionalResourcesFingerprint = additionalResourcesFingerprint;
}
+ // Determine whether the property value should be evaluated in terms of
the parameter context or not.
+ // If the sensitivity of the property does not match the sensitivity of
the parameter, the literal value will be returned
+ //
+ // Examples when SensitiveParam value = 'abc' and MY_PROP is non-sensitive:
+ // SensitiveProp --> 'abc'
+ // NonSensitiveProp --> '#{SensitiveParam}'
+ // context.getProperty(MY_PROP).getValue(); '#{SensitiveParam}'
+ private boolean isResolveParameter(final PropertyDescriptor descriptor,
final PropertyConfiguration config) {
+ boolean okToResolve = true;
+
+ final ParameterContext context = getParameterContext();
+ if (context == null) {
+ return false;
+ }
+ for (final ParameterReference reference :
config.getParameterReferences()) {
+ final String parameterName = reference.getParameterName();
+ final Optional<Parameter> optionalParameter =
context.getParameter(parameterName);
+ if (optionalParameter.isPresent()) {
+ final boolean paramIsSensitive =
optionalParameter.get().getDescriptor().isSensitive();
+ if (paramIsSensitive != descriptor.isSensitive()) {
+ okToResolve = false;
+ break;
+ }
+ }
+ }
+ return okToResolve;
+ }
+
+ // Evaluates the parameter value if it is ok to do so, otherwise return
the raw "${param}" literal.
+ // This is done to prevent evaluation of a sensitive parameter when
setting a non-sensitive property.
+ private String getConfigValue(final PropertyConfiguration config, final
boolean okToResolve) {
+ return okToResolve ? config.getEffectiveValue(getParameterContext()) :
config.getRawValue();
+ }
+
protected abstract ParameterContext getParameterContext();
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 053e97d..61c6622 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -290,4 +290,11 @@ public abstract class ProcessorNode extends
AbstractComponentNode implements Con
* @return the desired state for this Processor
*/
public abstract ScheduledState getDesiredState();
+
+ /**
+ * This method will be called once the processor's configuration has been
restored (on startup, reload, e.g.)
+ *
+ * @param context The ProcessContext associated with the Processor
configuration
+ */
+ public abstract void onConfigurationRestored(ProcessContext context);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index b409103..cbf9cbe 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -113,6 +113,7 @@ import
org.apache.nifi.controller.serialization.FlowSynchronizer;
import org.apache.nifi.controller.serialization.ScheduledStateLookup;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
@@ -152,6 +153,7 @@ import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.parameter.StandardParameterContextManager;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.provenance.ComponentIdentifierLookup;
import org.apache.nifi.provenance.IdentifierLookup;
import org.apache.nifi.provenance.ProvenanceAuthorizableFactory;
@@ -985,7 +987,9 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
for (final ProcessorNode procNode :
flowManager.getRootGroup().findAllProcessors()) {
final Processor processor = procNode.getProcessor();
try (final NarCloseable nc =
NarCloseable.withComponentNarLoader(extensionManager, processor.getClass(),
processor.getIdentifier())) {
-
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
processor);
+ final StandardProcessContext processContext = new
StandardProcessContext(procNode, controllerServiceProvider, encryptor,
+
getStateManagerProvider().getStateManager(processor.getIdentifier()), () ->
false, this);
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
processor, processContext);
}
}
@@ -993,7 +997,8 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
final ControllerService service =
serviceNode.getControllerServiceImplementation();
try (final NarCloseable nc =
NarCloseable.withComponentNarLoader(extensionManager, service.getClass(),
service.getIdentifier())) {
-
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
service);
+ final ConfigurationContext configurationContext = new
StandardConfigurationContext(serviceNode, controllerServiceProvider, null,
variableRegistry);
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
service, configurationContext);
}
}
@@ -1001,7 +1006,7 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
final ReportingTask task = taskNode.getReportingTask();
try (final NarCloseable nc =
NarCloseable.withComponentNarLoader(extensionManager, task.getClass(),
task.getIdentifier())) {
-
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
task);
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
task, taskNode.getConfigurationContext());
}
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSnippet.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSnippet.java
index 54c0b02..52cecd8 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSnippet.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSnippet.java
@@ -38,10 +38,11 @@ public interface FlowSnippet {
* Instantiates this snippet, adding it to the given Process Group
*
* @param flowManager the FlowManager
+ * @param flowController the FlowController
* @param group the group to add the snippet to
* @throws ProcessorInstantiationException if unable to instantiate any of
the Processors within the snippet
* @throws
org.apache.nifi.controller.exception.ControllerServiceInstantiationException if
unable to instantiate any of the Controller Services within the snippet
*/
- void instantiate(FlowManager flowManager, ProcessGroup group) throws
ProcessorInstantiationException;
+ void instantiate(FlowManager flowManager, FlowController flowController,
ProcessGroup group) throws ProcessorInstantiationException;
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
index f98975c..8ba67dc 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
@@ -42,6 +42,7 @@ import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.remote.PublicPort;
@@ -151,8 +152,8 @@ public class StandardFlowSnippet implements FlowSnippet {
}
}
- public void instantiate(final FlowManager flowManager, final ProcessGroup
group) throws ProcessorInstantiationException {
- instantiate(flowManager, group, true);
+ public void instantiate(final FlowManager flowManager, final
FlowController flowController, final ProcessGroup group) throws
ProcessorInstantiationException {
+ instantiate(flowManager, flowController, group, true);
}
@@ -221,7 +222,7 @@ public class StandardFlowSnippet implements FlowSnippet {
}
- public void instantiate(final FlowManager flowManager, final ProcessGroup
group, final boolean topLevel) {
+ public void instantiate(final FlowManager flowManager, final
FlowController flowController, final ProcessGroup group, final boolean
topLevel) {
//
// Instantiate Controller Services
//
@@ -406,6 +407,11 @@ public class StandardFlowSnippet implements FlowSnippet {
procNode.setProperties(config.getProperties());
}
+ // Notify the processor node that the configuration
(properties, e.g.) has been restored
+ final StandardProcessContext processContext = new
StandardProcessContext(procNode, flowController.getControllerServiceProvider(),
flowController.getEncryptor(),
+
flowController.getStateManagerProvider().getStateManager(procNode.getProcessor().getIdentifier()),
() -> false, flowController);
+ procNode.onConfigurationRestored(processContext);
+
group.addProcessor(procNode);
} finally {
procNode.resumeValidationTrigger();
@@ -526,7 +532,7 @@ public class StandardFlowSnippet implements FlowSnippet {
childTemplateDTO.setControllerServices(contents.getControllerServices());
final StandardFlowSnippet childSnippet = new
StandardFlowSnippet(childTemplateDTO, extensionManager);
- childSnippet.instantiate(flowManager, childGroup, false);
+ childSnippet.instantiate(flowManager, flowController, childGroup,
false);
if (groupDTO.getVersionControlInformation() != null) {
final VersionControlInformation vci =
StandardVersionControlInformation.Builder
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
index 74d5d4f..35bdae1 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
@@ -97,6 +97,11 @@ public class StandardReloadComponent implements
ReloadComponent {
// need to refresh the properties in case we are changing from ghost
component to real component
existingNode.refreshProperties();
+ // Notify the processor node that the configuration (properties, e.g.)
has been restored
+ final StandardProcessContext processContext = new
StandardProcessContext(existingNode,
flowController.getControllerServiceProvider(), flowController.getEncryptor(),
+
flowController.getStateManagerProvider().getStateManager(existingNode.getProcessor().getIdentifier()),
() -> false, flowController);
+ existingNode.onConfigurationRestored(processContext);
+
logger.debug("Triggering async validation of {} due to processor
reload", existingNode);
flowController.getValidationTrigger().trigger(existingNode);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
index 01b4871..6cf1ccf 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
@@ -271,7 +271,7 @@ public class StandardFlowManager extends
AbstractFlowManager implements FlowMana
final FlowSnippet snippet = new StandardFlowSnippet(dto,
flowController.getExtensionManager());
snippet.validate(group);
- snippet.instantiate(this, group);
+ snippet.instantiate(this, flowController, group);
group.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
}
@@ -342,12 +342,6 @@ public class StandardFlowManager extends
AbstractFlowManager implements FlowMana
}
throw new ComponentLifeCycleException("Failed to invoke
@OnAdded methods of " + procNode.getProcessor(), e);
}
-
- if (flowController.isInitialized()) {
- try (final NarCloseable nc =
NarCloseable.withComponentNarLoader(extensionManager,
procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
-
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
procNode.getProcessor());
- }
- }
}
return procNode;
@@ -393,7 +387,7 @@ public class StandardFlowManager extends
AbstractFlowManager implements FlowMana
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class,
taskNode.getReportingTask());
if (flowController.isInitialized()) {
-
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
taskNode.getReportingTask());
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
taskNode.getReportingTask(), taskNode.getConfigurationContext());
}
} catch (final Exception e) {
throw new ComponentLifeCycleException("Failed to invoke
On-Added Lifecycle methods of " + taskNode.getReportingTask(), e);
@@ -497,7 +491,9 @@ public class StandardFlowManager extends
AbstractFlowManager implements FlowMana
if (flowController.isInitialized()) {
try (final NarCloseable nc =
NarCloseable.withComponentNarLoader(extensionManager, service.getClass(),
service.getIdentifier())) {
-
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
service);
+ final ConfigurationContext configurationContext =
+ new StandardConfigurationContext(serviceNode,
controllerServiceProvider, null, flowController.getVariableRegistry());
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
service, configurationContext);
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
index d4bc543..3ab11a9 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
@@ -125,6 +125,11 @@ public class StandardProcessorDAO extends ComponentDAO
implements ProcessorDAO {
// configure the processor
configureProcessor(processor, processorDTO);
+ // Notify the processor node that the configuration (properties,
e.g.) has been restored
+ final StandardProcessContext processContext = new
StandardProcessContext(processor,
flowController.getControllerServiceProvider(), flowController.getEncryptor(),
+
flowController.getStateManagerProvider().getStateManager(processor.getProcessor().getIdentifier()),
() -> false, flowController);
+ processor.onConfigurationRestored(processContext);
+
return processor;
} catch (IllegalStateException | ComponentLifeCycleException ise) {
throw new NiFiCoreException(ise.getMessage(), ise);
diff --git
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
index 48f12ce..65146df 100644
---
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
+++
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
@@ -26,6 +26,7 @@ import
org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
@@ -73,6 +74,7 @@ import java.util.concurrent.atomic.AtomicReference;
+ "Relationships or PropertyDescriptors defined by the scripted
processor will be added to the configuration dialog. The scripted processor can
"
+ "implement public void setLogger(ComponentLog logger) to get access
to the parent logger, as well as public void onScheduled(ProcessContext
context) and "
+ "public void onStopped(ProcessContext context) methods to be invoked
when the parent InvokeScriptedProcessor is scheduled or stopped, respectively.
"
+ + "NOTE: The script will be loaded when the processor is populated
with property values, see the Restrictions section for more security
implications. "
+ "Experimental: Impact of sustained usage not yet verified.")
@DynamicProperty(name = "A script engine property to update", value = "The
value to set it to",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
@@ -212,6 +214,12 @@ public class InvokeScriptedProcessor extends
AbstractSessionFactoryProcessor {
invokeScriptedProcessorMethod("onScheduled", context);
}
+ @OnConfigurationRestored
+ public void onConfigurationRestored(final ProcessContext context) {
+ scriptingComponentHelper.setupVariables(context);
+ setup();
+ }
+
public void setup() {
if (scriptNeedsReload.get() || processor.get() == null) {
if
(ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) {
@@ -242,8 +250,24 @@ public class InvokeScriptedProcessor extends
AbstractSessionFactoryProcessor {
|| ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor)
|| ScriptingComponentUtils.MODULES.equals(descriptor)
|| scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
+
+ // Update the ScriptingComponentHelper's value(s)
+ if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor)) {
+ scriptingComponentHelper.setScriptPath(newValue);
+ } else if (ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor))
{
+ scriptingComponentHelper.setScriptBody(newValue);
+ } else if (ScriptingComponentUtils.MODULES.equals(descriptor)) {
+ scriptingComponentHelper.setScriptBody(newValue);
+ } else if
(scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
+ scriptingComponentHelper.setScriptEngineName(newValue);
+ }
+
scriptNeedsReload.set(true);
scriptRunner = null; //reset engine. This happens only when a
processor is stopped, so there won't be any performance impact in run-time.
+ if (isConfigurationRestored()) {
+ // Once the configuration has been restored, each call to
onPropertyModified() is due to a change made after the processor was loaded, so
reload the script
+ setup();
+ }
} else if (instance != null) {
// If the script provides a Processor, call its
onPropertyModified() method
try {
diff --git
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java
index 8c8a391..5201fba 100644
---
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java
+++
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java
@@ -17,6 +17,7 @@
package org.apache.nifi.script;
import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@@ -24,6 +25,7 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.script.ScriptRunner;
@@ -116,6 +118,12 @@ public abstract class AbstractScriptedControllerService
extends AbstractControll
}
}
+ @OnConfigurationRestored
+ public void onConfigurationRestored(final ProcessContext context) {
+ scriptingComponentHelper.setupVariables(context);
+ setup();
+ }
+
@Override
protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
diff --git
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
index a3e948f..bb96578 100644
---
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
+++
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
@@ -18,7 +18,9 @@ package org.apache.nifi.processors.script;
import org.apache.commons.codec.binary.Hex;
+import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.serialization.record.MockRecordParser;
@@ -140,14 +142,14 @@ public class TestInvokeGroovy extends BaseScriptTest {
*/
@Test
public void testInvokeScriptCausesException() {
- final TestRunner runner = TestRunners.newTestRunner(new
InvokeScriptedProcessor());
+ final TestRunner runner = TestRunners.newTestRunner(new
OverrideInvokeScriptedProcessor());
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE,
"Groovy");
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY,
getFileContentsAsString(
TEST_RESOURCE_LOCATION +
"groovy/testInvokeScriptCausesException.groovy")
);
runner.assertValid();
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
- assertThrows(AssertionError.class, () -> runner.run());
+ assertThrows(AssertionError.class, runner::run);
}
/**
@@ -198,7 +200,7 @@ public class TestInvokeGroovy extends BaseScriptTest {
runner.assertAllFlowFilesTransferred("success", 1);
final List<MockFlowFile> result =
runner.getFlowFilesForRelationship("success");
- assertTrue(result.size() == 1);
+ assertEquals(1, result.size());
final String expectedOutput = new
String(Hex.encodeHex(MessageDigestUtils.getDigest("testbla bla".getBytes())));
final MockFlowFile outputFlowFile = result.get(0);
outputFlowFile.assertContentEquals(expectedOutput);
@@ -238,4 +240,22 @@ public class TestInvokeGroovy extends BaseScriptTest {
MockFlowFile ff = result.get(0);
ff.assertContentEquals("48\n47\n14\n");
}
+
+ private static class OverrideInvokeScriptedProcessor extends
InvokeScriptedProcessor {
+
+ private int numTimesModifiedCalled = 0;
+
+ @OnConfigurationRestored
+ @Override
+ public void onConfigurationRestored(ProcessContext context) {
+ super.onConfigurationRestored(context);
+ assertEquals(this.getSupportedPropertyDescriptors().size(),
numTimesModifiedCalled);
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor,
final String oldValue, final String newValue) {
+ super.onPropertyModified(descriptor, oldValue, newValue);
+ numTimesModifiedCalled++;
+ }
+ }
}
diff --git
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeJython.java
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeJython.java
index bef3fb2..0efbc5f 100644
---
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeJython.java
+++
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeJython.java
@@ -67,7 +67,6 @@ public class TestInvokeJython extends BaseScriptTest {
@Test
public void testInvalidThenFixed() {
final TestRunner runner = TestRunners.newTestRunner(new
InvokeScriptedProcessor());
- runner.setValidateExpressionUsage(false);
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE,
"python");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE,
"target/test/resources/jython/test_invalid.py");
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
index 737f737..45cd711 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
@@ -21,6 +21,7 @@ import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
@@ -28,6 +29,7 @@ import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.LocalPort;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.StandardConnection;
+import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
@@ -47,6 +49,7 @@ import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
@@ -61,6 +64,7 @@ import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.remote.StandardRemoteProcessGroup;
@@ -174,7 +178,10 @@ public class StatelessFlowManager extends
AbstractFlowManager implements FlowMan
}
try (final NarCloseable nc =
NarCloseable.withComponentNarLoader(extensionManager,
procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
-
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
procNode.getProcessor());
+ final StateManager stateManager =
statelessEngine.getStateManagerProvider().getStateManager(id);
+ final StandardProcessContext processContext = new
StandardProcessContext(procNode, statelessEngine.getControllerServiceProvider(),
+ statelessEngine.getPropertyEncryptor(), stateManager,
() -> false, new StatelessNodeTypeProvider());
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
procNode.getProcessor(), processContext);
}
LogRepositoryFactory.getRepository(procNode.getIdentifier()).setLogger(procNode.getLogger());
@@ -292,7 +299,7 @@ public class StatelessFlowManager extends
AbstractFlowManager implements FlowMan
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class,
taskNode.getReportingTask());
if (isFlowInitialized()) {
-
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
taskNode.getReportingTask());
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
taskNode.getReportingTask(), taskNode.getConfigurationContext());
}
} catch (final Exception e) {
throw new ComponentLifeCycleException("Failed to invoke
On-Added Lifecycle methods of " + taskNode.getReportingTask(), e);
@@ -343,7 +350,9 @@ public class StatelessFlowManager extends
AbstractFlowManager implements FlowMan
final ExtensionManager extensionManager =
statelessEngine.getExtensionManager();
try (final NarCloseable nc =
NarCloseable.withComponentNarLoader(extensionManager, service.getClass(),
service.getIdentifier())) {
-
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
service);
+ final ConfigurationContext configurationContext =
+ new StandardConfigurationContext(serviceNode,
statelessEngine.getControllerServiceProvider(), null,
statelessEngine.getRootVariableRegistry());
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
service, configurationContext);
}
final ControllerService serviceImpl =
serviceNode.getControllerServiceImplementation();
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java
index de4dfb4..238842f 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java
@@ -102,6 +102,11 @@ public class StatelessReloadComponent implements
ReloadComponent {
// need to refresh the properties in case we are changing from ghost
component to real component
existingNode.refreshProperties();
+ // Notify the processor node that the configuration (properties, e.g.)
has been restored
+ final StandardProcessContext processContext = new
StandardProcessContext(existingNode,
statelessEngine.getControllerServiceProvider(),
+ statelessEngine.getPropertyEncryptor(),
statelessEngine.getStateManagerProvider().getStateManager(id), () -> false, new
StatelessNodeTypeProvider());
+ existingNode.onConfigurationRestored(processContext);
+
logger.debug("Successfully reloaded {}", existingNode);
}