Repository: nifi
Updated Branches:
  refs/heads/master 3746ae258 -> d126743d9


NIFI-5437 - Yield in ExecuteScript when catching ScriptException

This closes #2903

Signed-off-by: Mike Thomsen <mikerthom...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d126743d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d126743d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d126743d

Branch: refs/heads/master
Commit: d126743d93f6d754774b0d3074f3464e01430eda
Parents: 3746ae2
Author: Pierre Villard <pierre.villard...@gmail.com>
Authored: Tue Jul 17 18:29:32 2018 +0200
Committer: Mike Thomsen <mikerthom...@gmail.com>
Committed: Tue Jul 17 19:58:33 2018 -0400

----------------------------------------------------------------------
 .../nifi/processors/script/ExecuteScript.java   | 12 ++++++++++++
 .../processors/script/TestExecuteGroovy.java    | 20 +++++++++++++++-----
 .../resources/groovy/testScriptException.groovy | 18 ++++++++++++++++++
 3 files changed, 45 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d126743d/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
index 9b241f3..8aa8199 100644
--- 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
+++ 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
@@ -19,9 +19,11 @@ package org.apache.nifi.processors.script;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.Restricted;
 import org.apache.nifi.annotation.behavior.Restriction;
 import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -77,6 +79,7 @@ import java.util.Set;
                         explanation = "Provides operator the ability to 
execute arbitrary code assuming all permissions that NiFi has.")
         }
 )
+@InputRequirement(Requirement.INPUT_ALLOWED)
 @Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER},
         description = "Scripts can store and retrieve state using the State 
Management APIs. Consult the State Manager section of the Developer's Guide for 
more details.")
 @SeeAlso({InvokeScriptedProcessor.class})
@@ -236,11 +239,20 @@ public class ExecuteScript extends 
AbstractSessionFactoryProcessor {
                 // class with InvokeScriptedProcessor
                 session.commit();
             } catch (ScriptException e) {
+                // The below 'session.rollback(true)' reverts any changes made 
during this session (all FlowFiles are
+                // restored back to their initial session state and back to 
their original queues after being penalized).
+                // However if the incoming relationship is full of flow files, 
this processor will keep failing and could
+                // cause resource exhaustion. In case a user does not want to 
yield, it can be set to 0s in the processor
+                // configuration.
+                context.yield();
                 throw new ProcessException(e);
             }
         } catch (final Throwable t) {
             // Mimic AbstractProcessor behavior here
             getLogger().error("{} failed to process due to {}; rolling back 
session", new Object[]{this, t});
+
+            // the rollback might not penalize the incoming flow file if the 
exception is thrown before the user gets
+            // the flow file from the session binding (ff = session.get()).
             session.rollback(true);
             throw t;
         } finally {

http://git-wip-us.apache.org/repos/asf/nifi/blob/d126743d/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestExecuteGroovy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestExecuteGroovy.java
 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestExecuteGroovy.java
index d7deeb7..449f37a 100644
--- 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestExecuteGroovy.java
+++ 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestExecuteGroovy.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.script;
 
 import org.apache.nifi.script.ScriptingComponentUtils;
 import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -26,6 +27,8 @@ import java.util.HashMap;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 
 public class TestExecuteGroovy extends BaseScriptTest {
@@ -274,19 +277,26 @@ public class TestExecuteGroovy extends BaseScriptTest {
     }
 
     /**
-     * Tests a script that throws an Exception within. The expected result is 
that the FlowFile will be routed to
-     * failure
+     * Tests a script that throws an Exception within. The expected result is 
that the flow file is rolled back
+     * and penalized. Besides we check that we yielded the processor.
      *
      * @throws Exception Any error encountered while testing
      */
-    @Test(expected = AssertionError.class)
+    @Test
     public void testScriptException() throws Exception {
         runner.setValidateExpressionUsage(false);
         
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE,
 "Groovy");
-        runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, "throw new 
Exception()");
+        runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, 
getFileContentsAsString(TEST_RESOURCE_LOCATION + 
"groovy/testScriptException.groovy"));
 
         runner.assertValid();
         runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
-        runner.run();
+        try {
+            runner.run();
+            fail();
+        } catch (AssertionError e) {
+            runner.assertPenalizeCount(1); // penalized
+            runner.assertQueueNotEmpty(); // flow file back in the input queue
+            assertTrue(((MockProcessContext) 
runner.getProcessContext()).isYieldCalled()); // processor yielded
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d126743d/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptException.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptException.groovy
 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptException.groovy
new file mode 100644
index 0000000..1998234
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptException.groovy
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+FlowFile flowFile = session.get()
+throw new Exception()
\ No newline at end of file

Reply via email to