This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new f40b7fc  NIFI-2215: Added support for onScheduled and onStopped in 
InvokeScriptedProcessor NIFI-2215: Incorporated review comments
f40b7fc is described below

commit f40b7fc6bad14a64bce104b3748baa95847a674c
Author: Matthew Burgess <[email protected]>
AuthorDate: Wed Mar 13 18:10:51 2019 -0400

    NIFI-2215: Added support for onScheduled and onStopped in 
InvokeScriptedProcessor
    NIFI-2215: Incorporated review comments
    
    This closes #3370
    
    Signed-off-by: Mike Thomsen <[email protected]>
---
 .../processors/script/InvokeScriptedProcessor.java | 35 ++++++++++++++++++++--
 .../src/test/resources/groovy/test_reader.groovy   | 19 +++++++++++-
 2 files changed, 51 insertions(+), 3 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
index c2df128..2790f83 100644
--- 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
+++ 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
@@ -68,7 +68,9 @@ import 
org.apache.nifi.script.impl.FilteredPropertiesValidationContextAdapter;
 @CapabilityDescription("Experimental - Invokes a script engine for a Processor 
defined in the given script. The script must define "
         + "a valid class that implements the Processor interface, and it must 
set a variable 'processor' to an instance of "
         + "the class. Processor methods such as onTrigger() will be delegated 
to the scripted Processor instance. Also any "
-        + "Relationships or PropertyDescriptors defined by the scripted 
processor will be added to the configuration dialog.  "
+        + "Relationships or PropertyDescriptors defined by the scripted 
processor will be added to the configuration dialog. The scripted processor can 
"
+        + "implement public void setLogger(ComponentLog logger) to get access 
to the parent logger, as well as public void onScheduled(ProcessContext 
context) and "
+        + "public void onStopped(ProcessContext context) methods to be invoked 
when the parent InvokeScriptedProcessor is scheduled or stopped, respectively.  
"
         + "Experimental: Impact of sustained usage not yet verified.")
 @DynamicProperty(name = "A script engine property to update", value = "The 
value to set it to",
         expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
@@ -205,6 +207,8 @@ public class InvokeScriptedProcessor extends 
AbstractSessionFactoryProcessor {
     public void setup(final ProcessContext context) {
         scriptingComponentHelper.setupVariables(context);
         setup();
+
+        invokeScriptedProcessorMethod("onScheduled", context);
     }
 
     public void setup() {
@@ -564,9 +568,36 @@ public class InvokeScriptedProcessor extends 
AbstractSessionFactoryProcessor {
     }
 
     @OnStopped
-    public void stop() {
+    public void stop(ProcessContext context) {
+        invokeScriptedProcessorMethod("onStopped", context);
         scriptingComponentHelper.stop();
         processor.set(null);
         scriptEngine = null;
     }
+
+    private void invokeScriptedProcessorMethod(String methodName, Object... 
params) {
+        // Run the scripted processor's method here, if it exists
+        if (scriptEngine instanceof Invocable) {
+            final Invocable invocable = (Invocable) scriptEngine;
+            final Object obj = scriptEngine.get("processor");
+            if (obj != null) {
+
+                ComponentLog logger = getLogger();
+                try {
+                    invocable.invokeMethod(obj, methodName, params);
+                } catch (final NoSuchMethodException nsme) {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Configured script Processor does not 
contain the method " + methodName);
+                    }
+                } catch (final Exception e) {
+                    // An error occurred during onScheduled, propagate it up
+                    logger.error("Error while executing the scripted 
processor's method " + methodName, e);
+                    if (e instanceof ProcessException) {
+                        throw (ProcessException) e;
+                    }
+                    throw new ProcessException(e);
+                }
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
index 414b9de..5d54701 100644
--- 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
+++ 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
@@ -24,11 +24,28 @@ class GroovyProcessor implements Processor {
     def descriptor = new PropertyDescriptor.Builder()
             
.name("test-attribute").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
 
+    def logger
+
+    def setAttributeFromThisInOnScheduled = ''
+
     @Override
     void initialize(ProcessorInitializationContext context) {
 
     }
 
+    void setLogger(log) {
+        logger = log
+    }
+
+    void onScheduled(ProcessContext context) {
+        // Set the attribute value for use in onTrigger
+        setAttributeFromThisInOnScheduled = 'test content'
+    }
+
+    void onStopped(ProcessContext context) {
+        logger.info("Called onStopped")
+    }
+
     @Override
     Set<Relationship> getRelationships() {
         return [REL_TEST] as Set
@@ -41,7 +58,7 @@ class GroovyProcessor implements Processor {
         if (flowFile == null) {
             return;
         }
-        flowFile = session.putAttribute(flowFile, "from-content", "test 
content")
+        flowFile = session.putAttribute(flowFile, 'from-content', 
setAttributeFromThisInOnScheduled)
         // transfer
         session.transfer(flowFile, REL_TEST)
         session.commit()

Reply via email to