This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new f40b7fc NIFI-2215: Added support for onScheduled and onStopped in
InvokeScriptedProcessor NIFI-2215: Incorporated review comments
f40b7fc is described below
commit f40b7fc6bad14a64bce104b3748baa95847a674c
Author: Matthew Burgess <[email protected]>
AuthorDate: Wed Mar 13 18:10:51 2019 -0400
NIFI-2215: Added support for onScheduled and onStopped in
InvokeScriptedProcessor
NIFI-2215: Incorporated review comments
This closes #3370
Signed-off-by: Mike Thomsen <[email protected]>
---
.../processors/script/InvokeScriptedProcessor.java | 35 ++++++++++++++++++++--
.../src/test/resources/groovy/test_reader.groovy | 19 +++++++++++-
2 files changed, 51 insertions(+), 3 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
index c2df128..2790f83 100644
---
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
+++
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
@@ -68,7 +68,9 @@ import
org.apache.nifi.script.impl.FilteredPropertiesValidationContextAdapter;
@CapabilityDescription("Experimental - Invokes a script engine for a Processor
defined in the given script. The script must define "
+ "a valid class that implements the Processor interface, and it must
set a variable 'processor' to an instance of "
+ "the class. Processor methods such as onTrigger() will be delegated
to the scripted Processor instance. Also any "
- + "Relationships or PropertyDescriptors defined by the scripted
processor will be added to the configuration dialog. "
+ + "Relationships or PropertyDescriptors defined by the scripted
processor will be added to the configuration dialog. The scripted processor can
"
+ + "implement public void setLogger(ComponentLog logger) to get access
to the parent logger, as well as public void onScheduled(ProcessContext
context) and "
+ + "public void onStopped(ProcessContext context) methods to be invoked
when the parent InvokeScriptedProcessor is scheduled or stopped, respectively.
"
+ "Experimental: Impact of sustained usage not yet verified.")
@DynamicProperty(name = "A script engine property to update", value = "The
value to set it to",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
@@ -205,6 +207,8 @@ public class InvokeScriptedProcessor extends
AbstractSessionFactoryProcessor {
public void setup(final ProcessContext context) {
scriptingComponentHelper.setupVariables(context);
setup();
+
+ invokeScriptedProcessorMethod("onScheduled", context);
}
public void setup() {
@@ -564,9 +568,36 @@ public class InvokeScriptedProcessor extends
AbstractSessionFactoryProcessor {
}
@OnStopped
- public void stop() {
+ public void stop(ProcessContext context) {
+ invokeScriptedProcessorMethod("onStopped", context);
scriptingComponentHelper.stop();
processor.set(null);
scriptEngine = null;
}
+
+ private void invokeScriptedProcessorMethod(String methodName, Object...
params) {
+ // Run the scripted processor's method here, if it exists
+ if (scriptEngine instanceof Invocable) {
+ final Invocable invocable = (Invocable) scriptEngine;
+ final Object obj = scriptEngine.get("processor");
+ if (obj != null) {
+
+ ComponentLog logger = getLogger();
+ try {
+ invocable.invokeMethod(obj, methodName, params);
+ } catch (final NoSuchMethodException nsme) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Configured script Processor does not
contain the method " + methodName);
+ }
+ } catch (final Exception e) {
+ // An error occurred during onScheduled, propagate it up
+ logger.error("Error while executing the scripted
processor's method " + methodName, e);
+ if (e instanceof ProcessException) {
+ throw (ProcessException) e;
+ }
+ throw new ProcessException(e);
+ }
+ }
+ }
+ }
}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
index 414b9de..5d54701 100644
---
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
+++
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
@@ -24,11 +24,28 @@ class GroovyProcessor implements Processor {
def descriptor = new PropertyDescriptor.Builder()
.name("test-attribute").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
+ def logger
+
+ def setAttributeFromThisInOnScheduled = ''
+
@Override
void initialize(ProcessorInitializationContext context) {
}
+ void setLogger(log) {
+ logger = log
+ }
+
+ void onScheduled(ProcessContext context) {
+ // Set the attribute value for use in onTrigger
+ setAttributeFromThisInOnScheduled = 'test content'
+ }
+
+ void onStopped(ProcessContext context) {
+ logger.info("Called onStopped")
+ }
+
@Override
Set<Relationship> getRelationships() {
return [REL_TEST] as Set
@@ -41,7 +58,7 @@ class GroovyProcessor implements Processor {
if (flowFile == null) {
return;
}
- flowFile = session.putAttribute(flowFile, "from-content", "test
content")
+ flowFile = session.putAttribute(flowFile, 'from-content',
setAttributeFromThisInOnScheduled)
// transfer
session.transfer(flowFile, REL_TEST)
session.commit()