Repository: nifi Updated Branches: refs/heads/master 3746ae258 -> d126743d9
NIFI-5437 - Yield in ExecuteScript when catching ScriptException This closes #2903 Signed-off-by: Mike Thomsen <mikerthom...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d126743d Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d126743d Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d126743d Branch: refs/heads/master Commit: d126743d93f6d754774b0d3074f3464e01430eda Parents: 3746ae2 Author: Pierre Villard <pierre.villard...@gmail.com> Authored: Tue Jul 17 18:29:32 2018 +0200 Committer: Mike Thomsen <mikerthom...@gmail.com> Committed: Tue Jul 17 19:58:33 2018 -0400 ---------------------------------------------------------------------- .../nifi/processors/script/ExecuteScript.java | 12 ++++++++++++ .../processors/script/TestExecuteGroovy.java | 20 +++++++++++++++----- .../resources/groovy/testScriptException.groovy | 18 ++++++++++++++++++ 3 files changed, 45 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d126743d/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java index 9b241f3..8aa8199 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java @@ -19,9 +19,11 @@ package org.apache.nifi.processors.script; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.behavior.Restriction; import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; @@ -77,6 +79,7 @@ import java.util.Set; explanation = "Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.") } ) +@InputRequirement(Requirement.INPUT_ALLOWED) @Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "Scripts can store and retrieve state using the State Management APIs. Consult the State Manager section of the Developer's Guide for more details.") @SeeAlso({InvokeScriptedProcessor.class}) @@ -236,11 +239,20 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor { // class with InvokeScriptedProcessor session.commit(); } catch (ScriptException e) { + // The below 'session.rollback(true)' reverts any changes made during this session (all FlowFiles are + // restored back to their initial session state and back to their original queues after being penalized). + // However if the incoming relationship is full of flow files, this processor will keep failing and could + // cause resource exhaustion. In case a user does not want to yield, it can be set to 0s in the processor + // configuration. + context.yield(); throw new ProcessException(e); } } catch (final Throwable t) { // Mimic AbstractProcessor behavior here getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t}); + + // the rollback might not penalize the incoming flow file if the exception is thrown before the user gets + // the flow file from the session binding (ff = session.get()). session.rollback(true); throw t; } finally { http://git-wip-us.apache.org/repos/asf/nifi/blob/d126743d/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestExecuteGroovy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestExecuteGroovy.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestExecuteGroovy.java index d7deeb7..449f37a 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestExecuteGroovy.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestExecuteGroovy.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.script; import org.apache.nifi.script.ScriptingComponentUtils; import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessContext; import org.junit.Before; import org.junit.Test; @@ -26,6 +27,8 @@ import java.util.HashMap; import java.util.List; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class TestExecuteGroovy extends BaseScriptTest { @@ -274,19 +277,26 @@ public class TestExecuteGroovy extends BaseScriptTest { } /** - * Tests a script that throws an Exception within. The expected result is that the FlowFile will be routed to - * failure + * Tests a script that throws an Exception within. The expected result is that the flow file is rolled back + * and penalized. Besides we check that we yielded the processor. * * @throws Exception Any error encountered while testing */ - @Test(expected = AssertionError.class) + @Test public void testScriptException() throws Exception { runner.setValidateExpressionUsage(false); runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy"); - runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, "throw new Exception()"); + runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, getFileContentsAsString(TEST_RESOURCE_LOCATION + "groovy/testScriptException.groovy")); runner.assertValid(); runner.enqueue("test content".getBytes(StandardCharsets.UTF_8)); - runner.run(); + try { + runner.run(); + fail(); + } catch (AssertionError e) { + runner.assertPenalizeCount(1); // penalized + runner.assertQueueNotEmpty(); // flow file back in the input queue + assertTrue(((MockProcessContext) runner.getProcessContext()).isYieldCalled()); // processor yielded + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d126743d/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptException.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptException.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptException.groovy new file mode 100644 index 0000000..1998234 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptException.groovy @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +FlowFile flowFile = session.get() +throw new Exception() \ No newline at end of file