NIFI-1458: Added ScriptedReportingTask This closes #1045.
Signed-off-by: Andy LoPresto <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/675f4f54 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/675f4f54 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/675f4f54 Branch: refs/heads/master Commit: 675f4f544cb87bef0ec51b8dee5023088ad224ad Parents: 31ec01b Author: Matt Burgess <[email protected]> Authored: Fri Oct 14 13:41:37 2016 -0400 Committer: Andy LoPresto <[email protected]> Committed: Fri Jan 6 11:56:19 2017 -0800 ---------------------------------------------------------------------- .../src/main/resources/META-INF/NOTICE | 14 + .../nifi-scripting-processors/pom.xml | 4 + .../script/AbstractScriptProcessor.java | 316 ------------------ .../nifi/processors/script/ExecuteScript.java | 81 +++-- .../script/InvokeScriptedProcessor.java | 83 +++-- .../script/ScriptingComponentHelper.java | 318 +++++++++++++++++++ .../script/ScriptingComponentUtils.java | 67 ++++ .../reporting/script/ScriptedReportingTask.java | 208 ++++++++++++ .../org.apache.nifi.reporting.ReportingTask | 16 + .../script/ExecuteScriptGroovyTest.groovy | 12 +- .../ScriptedReportingTaskGroovyTest.groovy | 216 +++++++++++++ .../AccessibleScriptingComponentHelper.java | 24 ++ .../nifi/processors/script/BaseScriptTest.java | 22 +- .../processors/script/TestExecuteGroovy.java | 62 ++-- .../processors/script/TestExecuteJRuby.java | 6 +- .../script/TestExecuteJavascript.java | 6 +- .../processors/script/TestExecuteJython.java | 10 +- .../nifi/processors/script/TestExecuteLua.java | 6 +- .../processors/script/TestInvokeGroovy.java | 24 +- .../processors/script/TestInvokeJavascript.java | 26 +- .../processors/script/TestInvokeJython.java | 32 +- .../groovy/test_log_provenance_events.groovy | 24 ++ .../resources/groovy/test_log_vm_stats.groovy | 27 ++ 23 files changed, 1120 insertions(+), 484 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/675f4f54/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-nar/src/main/resources/META-INF/NOTICE index ddd1770..769df95 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-nar/src/main/resources/META-INF/NOTICE @@ -33,6 +33,20 @@ The following binary components are provided under the Apache Software License v This product includes software from the Spring Framework, under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + (ASLv2) Yammer Metrics + The following NOTICE information applies: + Metrics + Copyright 2010-2012 Coda Hale and Yammer, Inc. + + This product includes software developed by Coda Hale and Yammer, Inc. + + This product includes code derived from the JSR-166 project (ThreadLocalRandom), which was released + with the following comments: + + Written by Doug Lea with assistance from members of JCP JSR-166 + Expert Group and released to the public domain, as explained at + http://creativecommons.org/publicdomain/zero/1.0/ + ****************** Eclipse Public License v1.0 ****************** http://git-wip-us.apache.org/repos/asf/nifi/blob/675f4f54/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml index 0c1e865..0deae7a 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml @@ -66,6 +66,10 @@ <artifactId>commons-io</artifactId> </dependency> <dependency> + <groupId>com.yammer.metrics</groupId> + <artifactId>metrics-core</artifactId> + </dependency> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/nifi/blob/675f4f54/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java deleted file mode 100644 index 56c6f0b..0000000 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java +++ /dev/null @@ -1,316 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.script; - -import org.apache.nifi.annotation.behavior.Stateful; -import org.apache.nifi.components.state.Scope; -import org.apache.nifi.logging.ComponentLog; - -import java.io.File; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.ServiceLoader; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.script.ScriptEngine; -import javax.script.ScriptEngineFactory; -import javax.script.ScriptEngineManager; -import javax.script.ScriptException; - -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.processor.AbstractSessionFactoryProcessor; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.util.StringUtils; - -/** - * This class contains variables and methods common to scripting processors - */ -@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, - description = "Scripts can store and retrieve state using the State Management APIs. Consult the State Manager section of the Developer's Guide for more details.") -public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProcessor { - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("FlowFiles that were successfully processed") - .build(); - - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("FlowFiles that failed to be processed") - .build(); - - public static PropertyDescriptor SCRIPT_ENGINE; - - public static final PropertyDescriptor SCRIPT_FILE = new PropertyDescriptor.Builder() - .name("Script File") - .required(false) - .description("Path to script file to execute. Only one of Script File or Script Body may be used") - .addValidator(new StandardValidators.FileExistsValidator(true)) - .expressionLanguageSupported(true) - .build(); - - public static final PropertyDescriptor SCRIPT_BODY = new PropertyDescriptor.Builder() - .name("Script Body") - .required(false) - .description("Body of script to execute. Only one of Script File or Script Body may be used") - .addValidator(Validator.VALID) - .expressionLanguageSupported(false) - .build(); - - public static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder() - .name("Module Directory") - .description("Comma-separated list of paths to files and/or directories which contain modules required by the script.") - .required(false) - .expressionLanguageSupported(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - // A map from engine name to a custom configurator for that engine - protected final Map<String, ScriptEngineConfigurator> scriptEngineConfiguratorMap = new ConcurrentHashMap<>(); - protected final AtomicBoolean isInitialized = new AtomicBoolean(false); - - protected Map<String, ScriptEngineFactory> scriptEngineFactoryMap; - protected String scriptEngineName; - protected String scriptPath; - protected String scriptBody; - protected String[] modules; - protected List<PropertyDescriptor> descriptors; - - protected BlockingQueue<ScriptEngine> engineQ = null; - - /** - * Custom validation for ensuring exactly one of Script File or Script Body is populated - * - * @param validationContext provides a mechanism for obtaining externally - * managed values, such as property values and supplies convenience methods - * for operating on those values - * @return A collection of validation results - */ - @Override - protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { - Set<ValidationResult> results = new HashSet<>(); - - // Verify that exactly one of "script file" or "script body" is set - Map<PropertyDescriptor, String> propertyMap = validationContext.getProperties(); - if (StringUtils.isEmpty(propertyMap.get(SCRIPT_FILE)) == StringUtils.isEmpty(propertyMap.get(SCRIPT_BODY))) { - results.add(new ValidationResult.Builder().valid(false).explanation( - "Exactly one of Script File or Script Body must be set").build()); - } - - return results; - } - - /** - * This method creates all resources needed for the script processor to function, such as script engines, - * script file reloader threads, etc. - */ - protected void createResources() { - descriptors = new ArrayList<>(); - // The following is required for JRuby, should be transparent to everything else. - // Note this is not done in a ScriptEngineConfigurator, as it is too early in the lifecycle. The - // setting must be there before the factories/engines are loaded. - System.setProperty("org.jruby.embed.localvariable.behavior", "persistent"); - - // Create list of available engines - ScriptEngineManager scriptEngineManager = new ScriptEngineManager(); - List<ScriptEngineFactory> scriptEngineFactories = scriptEngineManager.getEngineFactories(); - if (scriptEngineFactories != null) { - scriptEngineFactoryMap = new HashMap<>(scriptEngineFactories.size()); - List<AllowableValue> engineList = new LinkedList<>(); - for (ScriptEngineFactory factory : scriptEngineFactories) { - engineList.add(new AllowableValue(factory.getLanguageName())); - scriptEngineFactoryMap.put(factory.getLanguageName(), factory); - } - - // Sort the list by name so the list always looks the same. - Collections.sort(engineList, new Comparator<AllowableValue>() { - @Override - public int compare(AllowableValue o1, AllowableValue o2) { - if (o1 == null) { - return o2 == null ? 0 : 1; - } - if (o2 == null) { - return -1; - } - return o1.getValue().compareTo(o2.getValue()); - } - }); - - AllowableValue[] engines = engineList.toArray(new AllowableValue[engineList.size()]); - - SCRIPT_ENGINE = new PropertyDescriptor.Builder() - .name("Script Engine") - .required(true) - .description("The engine to execute scripts") - .allowableValues(engines) - .defaultValue(engines[0].getValue()) - .required(true) - .expressionLanguageSupported(false) - .build(); - descriptors.add(SCRIPT_ENGINE); - } - - descriptors.add(SCRIPT_FILE); - descriptors.add(SCRIPT_BODY); - descriptors.add(MODULES); - - isInitialized.set(true); - } - - /** - * Determines whether the given path refers to a valid file - * - * @param path a path to a file - * @return true if the path refers to a valid file, false otherwise - */ - protected boolean isFile(final String path) { - return path != null && Files.isRegularFile(Paths.get(path)); - } - - /** - * Performs common setup operations when the processor is scheduled to run. This method assumes the member - * variables associated with properties have been filled. - * - * @param numberOfScriptEngines number of engines to setup - */ - public void setup(int numberOfScriptEngines) { - - if (scriptEngineConfiguratorMap.isEmpty()) { - ServiceLoader<ScriptEngineConfigurator> configuratorServiceLoader = - ServiceLoader.load(ScriptEngineConfigurator.class); - for (ScriptEngineConfigurator configurator : configuratorServiceLoader) { - scriptEngineConfiguratorMap.put(configurator.getScriptEngineName().toLowerCase(), configurator); - } - } - setupEngines(numberOfScriptEngines); - } - - /** - * Configures the specified script engine. First, the engine is loaded and instantiated using the JSR-223 - * javax.script APIs. Then, if any script configurators have been defined for this engine, their init() method is - * called, and the configurator is saved for future calls. - * - * @param numberOfScriptEngines number of engines to setup - * @see org.apache.nifi.processors.script.ScriptEngineConfigurator - */ - protected void setupEngines(int numberOfScriptEngines) { - engineQ = new LinkedBlockingQueue<>(numberOfScriptEngines); - ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader(); - try { - ComponentLog log = getLogger(); - - if (StringUtils.isBlank(scriptEngineName)) { - throw new IllegalArgumentException("The script engine name cannot be null"); - } - - ScriptEngineConfigurator configurator = scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase()); - - // Get a list of URLs from the configurator (if present), or just convert modules from Strings to URLs - URL[] additionalClasspathURLs = null; - if (configurator != null) { - additionalClasspathURLs = configurator.getModuleURLsForClasspath(modules, log); - } else { - if (modules != null) { - List<URL> urls = new LinkedList<>(); - for (String modulePathString : modules) { - try { - urls.add(new File(modulePathString).toURI().toURL()); - } catch (MalformedURLException mue) { - log.error("{} is not a valid file, ignoring", new Object[]{modulePathString}, mue); - } - } - additionalClasspathURLs = urls.toArray(new URL[urls.size()]); - } - } - - // Need the right classloader when the engine is created. This ensures the NAR's execution class loader - // (plus the module path) becomes the parent for the script engine - ClassLoader scriptEngineModuleClassLoader = additionalClasspathURLs != null - ? new URLClassLoader(additionalClasspathURLs, originalContextClassLoader) - : originalContextClassLoader; - if (scriptEngineModuleClassLoader != null) { - Thread.currentThread().setContextClassLoader(scriptEngineModuleClassLoader); - } - - for (int i = 0; i < numberOfScriptEngines; i++) { - ScriptEngine scriptEngine = createScriptEngine(); - try { - if (configurator != null) { - configurator.init(scriptEngine, modules); - } - if (!engineQ.offer(scriptEngine)) { - log.error("Error adding script engine {}", new Object[]{scriptEngine.getFactory().getEngineName()}); - } - - } catch (ScriptException se) { - log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName}); - if (log.isDebugEnabled()) { - log.error("Error initializing script engine configurator", se); - } - } - } - } finally { - // Restore original context class loader - Thread.currentThread().setContextClassLoader(originalContextClassLoader); - } - } - - /** - * Provides a ScriptEngine corresponding to the currently selected script engine name. - * ScriptEngineManager.getEngineByName() doesn't use find ScriptEngineFactory.getName(), which - * is what we used to populate the list. So just search the list of factories until a match is - * found, then create and return a script engine. - * - * @return a Script Engine corresponding to the currently specified name, or null if none is found. - */ - protected ScriptEngine createScriptEngine() { - // - ScriptEngineFactory factory = scriptEngineFactoryMap.get(scriptEngineName); - if (factory == null) { - return null; - } - return factory.getScriptEngine(); - } - - @OnStopped - public void stop() { - if (engineQ != null) { - engineQ.clear(); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/675f4f54/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java index b2bc8ef..9be8d0c 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java @@ -16,22 +16,30 @@ */ package org.apache.nifi.processors.script; + import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.Restricted; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.util.StringUtils; +import java.nio.charset.Charset; import javax.script.Bindings; import javax.script.ScriptContext; import javax.script.ScriptEngine; @@ -39,6 +47,7 @@ import javax.script.ScriptException; import javax.script.SimpleBindings; import java.io.FileInputStream; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -57,9 +66,18 @@ import java.util.Set; description = "Updates a script engine property specified by the Dynamic Property's key with the value " + "specified by the Dynamic Property's value") @Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.") -public class ExecuteScript extends AbstractScriptProcessor { +@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, + description = "Scripts can store and retrieve state using the State Management APIs. Consult the State Manager section of the Developer's Guide for more details.") +@SeeAlso({InvokeScriptedProcessor.class}) +public class ExecuteScript extends AbstractSessionFactoryProcessor { + + // Constants maintained for backwards compatibility + public static final Relationship REL_SUCCESS = ScriptingComponentUtils.REL_SUCCESS; + public static final Relationship REL_FAILURE = ScriptingComponentUtils.REL_FAILURE; private String scriptToRun = null; + volatile ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper(); + /** * Returns the valid relationships for this processor. @@ -83,13 +101,13 @@ public class ExecuteScript extends AbstractScriptProcessor { */ @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - synchronized (isInitialized) { - if (!isInitialized.get()) { - createResources(); + synchronized (scriptingComponentHelper.isInitialized) { + if (!scriptingComponentHelper.isInitialized.get()) { + scriptingComponentHelper.createResources(); } } - return Collections.unmodifiableList(descriptors); + return Collections.unmodifiableList(scriptingComponentHelper.getDescriptors()); } /** @@ -110,6 +128,10 @@ public class ExecuteScript extends AbstractScriptProcessor { .build(); } + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + return scriptingComponentHelper.customValidate(validationContext); + } /** * Performs setup operations when the processor is scheduled to run. This includes evaluating the processor's @@ -119,30 +141,22 @@ public class ExecuteScript extends AbstractScriptProcessor { */ @OnScheduled public void setup(final ProcessContext context) { - scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue(); - scriptPath = context.getProperty(SCRIPT_FILE).evaluateAttributeExpressions().getValue(); - scriptBody = context.getProperty(SCRIPT_BODY).getValue(); - String modulePath = context.getProperty(MODULES).getValue(); - if (!StringUtils.isEmpty(modulePath)) { - modules = modulePath.split(","); - } else { - modules = new String[0]; - } + scriptingComponentHelper.setupVariables(context); + // Create a script engine for each possible task int maxTasks = context.getMaxConcurrentTasks(); - super.setup(maxTasks); - scriptToRun = scriptBody; + scriptingComponentHelper.setup(maxTasks, getLogger()); + scriptToRun = scriptingComponentHelper.getScriptBody(); try { - if (scriptToRun == null && scriptPath != null) { - try (final FileInputStream scriptStream = new FileInputStream(scriptPath)) { - scriptToRun = IOUtils.toString(scriptStream); + if (scriptToRun == null && scriptingComponentHelper.getScriptPath() != null) { + try (final FileInputStream scriptStream = new FileInputStream(scriptingComponentHelper.getScriptPath())) { + scriptToRun = IOUtils.toString(scriptStream, Charset.defaultCharset()); } } } catch (IOException ioe) { throw new ProcessException(ioe); } - } /** @@ -158,12 +172,12 @@ public class ExecuteScript extends AbstractScriptProcessor { */ @Override public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { - synchronized (isInitialized) { - if (!isInitialized.get()) { - createResources(); + synchronized (scriptingComponentHelper.isInitialized) { + if (!scriptingComponentHelper.isInitialized.get()) { + scriptingComponentHelper.createResources(); } } - ScriptEngine scriptEngine = engineQ.poll(); + ScriptEngine scriptEngine = scriptingComponentHelper.engineQ.poll(); ComponentLog log = getLogger(); if (scriptEngine == null) { // No engine available so nothing more to do here @@ -197,11 +211,11 @@ public class ExecuteScript extends AbstractScriptProcessor { // Execute any engine-specific configuration before the script is evaluated ScriptEngineConfigurator configurator = - scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase()); + scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); // Evaluate the script with the configurator (if it exists) or the engine if (configurator != null) { - configurator.eval(scriptEngine, scriptToRun, modules); + configurator.eval(scriptEngine, scriptToRun, scriptingComponentHelper.getModules()); } else { scriptEngine.eval(scriptToRun); } @@ -219,7 +233,12 @@ public class ExecuteScript extends AbstractScriptProcessor { session.rollback(true); throw t; } finally { - engineQ.offer(scriptEngine); + scriptingComponentHelper.engineQ.offer(scriptEngine); } } + + @OnStopped + public void stop() { + scriptingComponentHelper.stop(); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/675f4f54/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java index 53219ac..fde4bb6 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java @@ -20,6 +20,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.Restricted; +import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; @@ -28,9 +29,11 @@ import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.NodeTypeProvider; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Processor; @@ -44,6 +47,7 @@ import javax.script.ScriptEngine; import javax.script.ScriptException; import java.io.File; import java.io.FileInputStream; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -61,9 +65,11 @@ import java.util.concurrent.atomic.AtomicReference; + "Experimental: Impact of sustained usage not yet verified.") @DynamicProperty(name = "A script engine property to update", value = "The value to set it to", supportsExpressionLanguage = true, description = "Updates a script engine property specified by the Dynamic Property's key with the value specified by the Dynamic Property's value") +@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, + description = "Scripts can store and retrieve state using the State Management APIs. Consult the State Manager section of the Developer's Guide for more details.") @SeeAlso({ExecuteScript.class}) @Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.") -public class InvokeScriptedProcessor extends AbstractScriptProcessor { +public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor { private final AtomicReference<Processor> processor = new AtomicReference<>(); private final AtomicReference<Collection<ValidationResult>> validationResults = new AtomicReference<>(new ArrayList<>()); @@ -74,6 +80,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { private volatile String kerberosServicePrincipal = null; private volatile File kerberosConfigFile = null; private volatile File kerberosServiceKeytab = null; + volatile ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper(); /** * Returns the valid relationships for this processor as supplied by the @@ -123,13 +130,13 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - synchronized (isInitialized) { - if (!isInitialized.get()) { - createResources(); + synchronized (scriptingComponentHelper.isInitialized) { + if (!scriptingComponentHelper.isInitialized.get()) { + scriptingComponentHelper.createResources(); } } List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>(); - supportedPropertyDescriptors.addAll(descriptors); + supportedPropertyDescriptors.addAll(scriptingComponentHelper.getDescriptors()); final Processor instance = processor.get(); if (instance != null) { @@ -182,23 +189,15 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { */ @OnScheduled public void setup(final ProcessContext context) { - scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue(); - scriptPath = context.getProperty(SCRIPT_FILE).evaluateAttributeExpressions().getValue(); - scriptBody = context.getProperty(SCRIPT_BODY).getValue(); - String modulePath = context.getProperty(MODULES).getValue(); - if (!StringUtils.isEmpty(modulePath)) { - modules = modulePath.split(","); - } else { - modules = new String[0]; - } + scriptingComponentHelper.setupVariables(context); setup(); } public void setup() { // Create a single script engine, the Processor object is reused by each task if(scriptEngine == null) { - super.setup(1); - scriptEngine = engineQ.poll(); + scriptingComponentHelper.setup(1, getLogger()); + scriptEngine = scriptingComponentHelper.engineQ.poll(); } if (scriptEngine == null) { @@ -206,10 +205,10 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { } if (scriptNeedsReload.get() || processor.get() == null) { - if (isFile(scriptPath)) { - reloadScriptFile(scriptPath); + if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) { + reloadScriptFile(scriptingComponentHelper.getScriptPath()); } else { - reloadScriptBody(scriptBody); + reloadScriptBody(scriptingComponentHelper.getScriptBody()); } scriptNeedsReload.set(false); } @@ -228,13 +227,13 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { final ComponentLog logger = getLogger(); final Processor instance = processor.get(); - if (SCRIPT_FILE.equals(descriptor) - || SCRIPT_BODY.equals(descriptor) - || MODULES.equals(descriptor) - || SCRIPT_ENGINE.equals(descriptor)) { + if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor) + || ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor) + || ScriptingComponentUtils.MODULES.equals(descriptor) + || scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) { scriptNeedsReload.set(true); // Need to reset scriptEngine if the value has changed - if (SCRIPT_ENGINE.equals(descriptor)) { + if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) { scriptEngine = null; } } else if (instance != null) { @@ -258,7 +257,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { final Collection<ValidationResult> results = new HashSet<>(); try (final FileInputStream scriptStream = new FileInputStream(scriptPath)) { - return reloadScript(IOUtils.toString(scriptStream)); + return reloadScript(IOUtils.toString(scriptStream, Charset.defaultCharset())); } catch (final Exception e) { final ComponentLog logger = getLogger(); @@ -300,7 +299,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { .subject("ScriptValidation") .valid(false) .explanation("Unable to load script due to " + e) - .input(scriptPath) + .input(scriptingComponentHelper.getScriptPath()) .build()); } @@ -329,9 +328,9 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { final Invocable invocable = (Invocable) scriptEngine; // Find a custom configurator and invoke their eval() method - ScriptEngineConfigurator configurator = scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase()); + ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); if (configurator != null) { - configurator.eval(scriptEngine, scriptBody, modules); + configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); } else { // evaluate the script scriptEngine.eval(scriptBody); @@ -412,7 +411,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { .subject("ScriptValidation") .valid(false) .explanation("Unable to load script due to " + ex.getLocalizedMessage()) - .input(scriptPath) + .input(scriptingComponentHelper.getScriptPath()) .build()); } @@ -442,14 +441,14 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { return commonValidationResults; } - scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue(); - scriptPath = context.getProperty(SCRIPT_FILE).evaluateAttributeExpressions().getValue(); - scriptBody = context.getProperty(SCRIPT_BODY).getValue(); - String modulePath = context.getProperty(MODULES).getValue(); + scriptingComponentHelper.setScriptEngineName(context.getProperty(scriptingComponentHelper.SCRIPT_ENGINE).getValue()); + scriptingComponentHelper.setScriptPath(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue()); + scriptingComponentHelper.setScriptBody(context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue()); + String modulePath = context.getProperty(ScriptingComponentUtils.MODULES).getValue(); if (!StringUtils.isEmpty(modulePath)) { - modules = modulePath.split(","); + scriptingComponentHelper.setModules(modulePath.split(",")); } else { - modules = new String[0]; + scriptingComponentHelper.setModules(new String[0]); } setup(); @@ -477,7 +476,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { .subject("Validation") .valid(false) .explanation("An error occurred calling validate in the configured script Processor.") - .input(context.getProperty(SCRIPT_FILE).getValue()) + .input(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).getValue()) .build()); return results; } @@ -505,9 +504,9 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { // Initialize the rest of the processor resources if we have not already done so - synchronized (isInitialized) { - if (!isInitialized.get()) { - super.createResources(); + synchronized (scriptingComponentHelper.isInitialized) { + if (!scriptingComponentHelper.isInitialized.get()) { + scriptingComponentHelper.createResources(); } } @@ -529,7 +528,8 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { // run the processor instance.onTrigger(context, sessionFactory); } catch (final ProcessException e) { - final String message = String.format("An error occurred executing the configured Processor [%s]: %s", context.getProperty(SCRIPT_FILE).getValue(), e); + final String message = String.format("An error occurred executing the configured Processor [%s]: %s", + context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).getValue(), e); log.error(message); throw e; } @@ -539,9 +539,8 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { } @OnStopped - @Override public void stop() { - super.stop(); + scriptingComponentHelper.stop(); processor.set(null); scriptEngine = null; } http://git-wip-us.apache.org/repos/asf/nifi/blob/675f4f54/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptingComponentHelper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptingComponentHelper.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptingComponentHelper.java new file mode 100644 index 0000000..9edad98 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptingComponentHelper.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.script; + +import org.apache.nifi.logging.ComponentLog; + +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.script.ScriptEngine; +import javax.script.ScriptEngineFactory; +import javax.script.ScriptEngineManager; +import javax.script.ScriptException; + +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.StringUtils; + +/** + * This class contains variables and methods common to scripting processors, reporting tasks, etc. + */ +public class ScriptingComponentHelper { + + public PropertyDescriptor SCRIPT_ENGINE; + + // A map from engine name to a custom configurator for that engine + public final Map<String, ScriptEngineConfigurator> scriptEngineConfiguratorMap = new ConcurrentHashMap<>(); + public final AtomicBoolean isInitialized = new AtomicBoolean(false); + + public Map<String, ScriptEngineFactory> scriptEngineFactoryMap; + private String scriptEngineName; + private String scriptPath; + private String scriptBody; + private String[] modules; + private List<PropertyDescriptor> descriptors; + + public BlockingQueue<ScriptEngine> engineQ = null; + + public String getScriptEngineName() { + return scriptEngineName; + } + + public void setScriptEngineName(String scriptEngineName) { + this.scriptEngineName = scriptEngineName; + } + + public String getScriptPath() { + return scriptPath; + } + + public void setScriptPath(String scriptPath) { + this.scriptPath = scriptPath; + } + + public String getScriptBody() { + return scriptBody; + } + + public void setScriptBody(String scriptBody) { + this.scriptBody = scriptBody; + } + + public String[] getModules() { + return modules; + } + + public void setModules(String[] modules) { + this.modules = modules; + } + + public List<PropertyDescriptor> getDescriptors() { + return descriptors; + } + + public void setDescriptors(List<PropertyDescriptor> descriptors) { + this.descriptors = descriptors; + } + + /** + * Custom validation for ensuring exactly one of Script File or Script Body is populated + * + * @param validationContext provides a mechanism for obtaining externally + * managed values, such as property values and supplies convenience methods + * for operating on those values + * @return A collection of validation results + */ + public Collection<ValidationResult> customValidate(ValidationContext validationContext) { + Set<ValidationResult> results = new HashSet<>(); + + // Verify that exactly one of "script file" or "script body" is set + Map<PropertyDescriptor, String> propertyMap = validationContext.getProperties(); + if (StringUtils.isEmpty(propertyMap.get(ScriptingComponentUtils.SCRIPT_FILE)) == StringUtils.isEmpty(propertyMap.get(ScriptingComponentUtils.SCRIPT_BODY))) { + results.add(new ValidationResult.Builder().valid(false).explanation( + "Exactly one of Script File or Script Body must be set").build()); + } + + return results; + } + + /** + * This method creates all resources needed for the script processor to function, such as script engines, + * script file reloader threads, etc. + */ + public void createResources() { + descriptors = new ArrayList<>(); + // The following is required for JRuby, should be transparent to everything else. + // Note this is not done in a ScriptEngineConfigurator, as it is too early in the lifecycle. The + // setting must be there before the factories/engines are loaded. + System.setProperty("org.jruby.embed.localvariable.behavior", "persistent"); + + // Create list of available engines + ScriptEngineManager scriptEngineManager = new ScriptEngineManager(); + List<ScriptEngineFactory> scriptEngineFactories = scriptEngineManager.getEngineFactories(); + if (scriptEngineFactories != null) { + scriptEngineFactoryMap = new HashMap<>(scriptEngineFactories.size()); + List<AllowableValue> engineList = new LinkedList<>(); + for (ScriptEngineFactory factory : scriptEngineFactories) { + engineList.add(new AllowableValue(factory.getLanguageName())); + scriptEngineFactoryMap.put(factory.getLanguageName(), factory); + } + + // Sort the list by name so the list always looks the same. + Collections.sort(engineList, (o1, o2) -> { + if (o1 == null) { + return o2 == null ? 0 : 1; + } + if (o2 == null) { + return -1; + } + return o1.getValue().compareTo(o2.getValue()); + }); + + AllowableValue[] engines = engineList.toArray(new AllowableValue[engineList.size()]); + + SCRIPT_ENGINE = new PropertyDescriptor.Builder() + .name("Script Engine") + .required(true) + .description("The engine to execute scripts") + .allowableValues(engines) + .defaultValue(engines[0].getValue()) + .required(true) + .expressionLanguageSupported(false) + .build(); + descriptors.add(SCRIPT_ENGINE); + } + + descriptors.add(ScriptingComponentUtils.SCRIPT_FILE); + descriptors.add(ScriptingComponentUtils.SCRIPT_BODY); + descriptors.add(ScriptingComponentUtils.MODULES); + + isInitialized.set(true); + } + + /** + * Determines whether the given path refers to a valid file + * + * @param path a path to a file + * @return true if the path refers to a valid file, false otherwise + */ + public static boolean isFile(final String path) { + return path != null && Files.isRegularFile(Paths.get(path)); + } + + /** + * Performs common setup operations when the processor is scheduled to run. This method assumes the member + * variables associated with properties have been filled. + * + * @param numberOfScriptEngines number of engines to setup + */ + public void setup(int numberOfScriptEngines, ComponentLog log) { + + if (scriptEngineConfiguratorMap.isEmpty()) { + ServiceLoader<ScriptEngineConfigurator> configuratorServiceLoader = + ServiceLoader.load(ScriptEngineConfigurator.class); + for (ScriptEngineConfigurator configurator : configuratorServiceLoader) { + scriptEngineConfiguratorMap.put(configurator.getScriptEngineName().toLowerCase(), configurator); + } + } + setupEngines(numberOfScriptEngines, log); + } + + /** + * Configures the specified script engine. First, the engine is loaded and instantiated using the JSR-223 + * javax.script APIs. Then, if any script configurators have been defined for this engine, their init() method is + * called, and the configurator is saved for future calls. + * + * @param numberOfScriptEngines number of engines to setup + * @see org.apache.nifi.processors.script.ScriptEngineConfigurator + */ + protected void setupEngines(int numberOfScriptEngines, ComponentLog log) { + engineQ = new LinkedBlockingQueue<>(numberOfScriptEngines); + ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader(); + try { + if (StringUtils.isBlank(scriptEngineName)) { + throw new IllegalArgumentException("The script engine name cannot be null"); + } + + ScriptEngineConfigurator configurator = scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase()); + + // Get a list of URLs from the configurator (if present), or just convert modules from Strings to URLs + URL[] additionalClasspathURLs = null; + if (configurator != null) { + additionalClasspathURLs = configurator.getModuleURLsForClasspath(modules, log); + } else { + if (modules != null) { + List<URL> urls = new LinkedList<>(); + for (String modulePathString : modules) { + try { + urls.add(new File(modulePathString).toURI().toURL()); + } catch (MalformedURLException mue) { + log.error("{} is not a valid file, ignoring", new Object[]{modulePathString}, mue); + } + } + additionalClasspathURLs = urls.toArray(new URL[urls.size()]); + } + } + + // Need the right classloader when the engine is created. This ensures the NAR's execution class loader + // (plus the module path) becomes the parent for the script engine + ClassLoader scriptEngineModuleClassLoader = additionalClasspathURLs != null + ? new URLClassLoader(additionalClasspathURLs, originalContextClassLoader) + : originalContextClassLoader; + if (scriptEngineModuleClassLoader != null) { + Thread.currentThread().setContextClassLoader(scriptEngineModuleClassLoader); + } + + for (int i = 0; i < numberOfScriptEngines; i++) { + ScriptEngine scriptEngine = createScriptEngine(); + try { + if (configurator != null) { + configurator.init(scriptEngine, modules); + } + if (!engineQ.offer(scriptEngine)) { + log.error("Error adding script engine {}", new Object[]{scriptEngine.getFactory().getEngineName()}); + } + + } catch (ScriptException se) { + log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName}); + if (log.isDebugEnabled()) { + log.error("Error initializing script engine configurator", se); + } + } + } + } finally { + // Restore original context class loader + Thread.currentThread().setContextClassLoader(originalContextClassLoader); + } + } + + void setupVariables(ProcessContext context) { + scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue(); + scriptPath = context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue(); + scriptBody = context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue(); + String modulePath = context.getProperty(ScriptingComponentUtils.MODULES).getValue(); + if (!StringUtils.isEmpty(modulePath)) { + modules = modulePath.split(","); + } else { + modules = new String[0]; + } + } + + /** + * Provides a ScriptEngine corresponding to the currently selected script engine name. + * ScriptEngineManager.getEngineByName() doesn't use find ScriptEngineFactory.getName(), which + * is what we used to populate the list. So just search the list of factories until a match is + * found, then create and return a script engine. + * + * @return a Script Engine corresponding to the currently specified name, or null if none is found. + */ + protected ScriptEngine createScriptEngine() { + // + ScriptEngineFactory factory = scriptEngineFactoryMap.get(scriptEngineName); + if (factory == null) { + return null; + } + return factory.getScriptEngine(); + } + + void stop() { + if (engineQ != null) { + engineQ.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/675f4f54/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptingComponentUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptingComponentUtils.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptingComponentUtils.java new file mode 100644 index 0000000..ac9e778 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptingComponentUtils.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.script; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +/** + * Utility methods and constants used by the scripting components. + */ +public class ScriptingComponentUtils { + /** A relationship indicating flow files were processed successfully */ + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles that were successfully processed") + .build(); + + /** A relationship indicating an error while processing flow files */ + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles that failed to be processed") + .build(); + + /** A property descriptor for specifying the location of a script file */ + public static final PropertyDescriptor SCRIPT_FILE = new PropertyDescriptor.Builder() + .name("Script File") + .required(false) + .description("Path to script file to execute. Only one of Script File or Script Body may be used") + .addValidator(new StandardValidators.FileExistsValidator(true)) + .expressionLanguageSupported(true) + .build(); + + /** A property descriptor for specifying the body of a script */ + public static final PropertyDescriptor SCRIPT_BODY = new PropertyDescriptor.Builder() + .name("Script Body") + .required(false) + .description("Body of script to execute. Only one of Script File or Script Body may be used") + .addValidator(Validator.VALID) + .expressionLanguageSupported(false) + .build(); + + /** A property descriptor for specifying the location of additional modules to be used by the script */ + public static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder() + .name("Module Directory") + .description("Comma-separated list of paths to files and/or directories which contain modules required by the script.") + .required(false) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); +} + http://git-wip-us.apache.org/repos/asf/nifi/blob/675f4f54/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/reporting/script/ScriptedReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/reporting/script/ScriptedReportingTask.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/reporting/script/ScriptedReportingTask.java new file mode 100644 index 0000000..b2873d4 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/reporting/script/ScriptedReportingTask.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.script; + +import com.yammer.metrics.core.VirtualMachineMetrics; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.Restricted; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.script.ScriptEngineConfigurator; +import org.apache.nifi.processors.script.ScriptingComponentHelper; +import org.apache.nifi.processors.script.ScriptingComponentUtils; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.util.StringUtils; + +import javax.script.Bindings; +import javax.script.ScriptContext; +import javax.script.ScriptEngine; +import javax.script.ScriptException; +import javax.script.SimpleBindings; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * A Reporting task whose body is provided by a script (via supported JSR-223 script engines) + */ +@Tags({"reporting", "script", "execute", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj"}) +@CapabilityDescription("Provides reporting and status information to a script. ReportingContext, ComponentLog, and VirtualMachineMetrics objects are made available " + + "as variables (context, log, and vmMetrics, respectively) to the script for further processing. The context makes various information available such " + + "as events, provenance, bulletins, controller services, process groups, Java Virtual Machine metrics, etc.") +@DynamicProperty( + name = "A script engine property to update", + value = "The value to set it to", + supportsExpressionLanguage = true, + description = "Updates a script engine property specified by the Dynamic Property's key with the value " + + "specified by the Dynamic Property's value") +@Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.") +public class ScriptedReportingTask extends AbstractReportingTask { + + protected volatile ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper(); + private volatile String scriptToRun = null; + private volatile VirtualMachineMetrics vmMetrics; + + /** + * Returns a list of property descriptors supported by this processor. The list always includes properties such as + * script engine name, script file name, script body name, script arguments, and an external module path. If the + * scripted processor also defines supported properties, those are added to the list as well. + * + * @return a List of PropertyDescriptor objects supported by this processor + */ + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + synchronized (scriptingComponentHelper.isInitialized) { + if (!scriptingComponentHelper.isInitialized.get()) { + scriptingComponentHelper.createResources(); + } + } + + return Collections.unmodifiableList(scriptingComponentHelper.getDescriptors()); + } + + /** + * Returns a PropertyDescriptor for the given name. This is for the user to be able to define their own properties + * which will be available as variables in the script + * + * @param propertyDescriptorName used to lookup if any property descriptors exist for that name + * @return a PropertyDescriptor object corresponding to the specified dynamic property name + */ + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + return scriptingComponentHelper.customValidate(validationContext); + } + + /** + * Performs setup operations when the processor is scheduled to run. This includes evaluating the processor's + * properties, as well as reloading the script (from file or the "Script Body" property) + * + * @param context the context in which to perform the setup operations + */ + @OnScheduled + public void setup(final ConfigurationContext context) { + scriptingComponentHelper.setScriptEngineName(context.getProperty(scriptingComponentHelper.SCRIPT_ENGINE).getValue()); + scriptingComponentHelper.setScriptPath(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue()); + scriptingComponentHelper.setScriptBody(context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue()); + String modulePath = context.getProperty(ScriptingComponentUtils.MODULES).getValue(); + if (!StringUtils.isEmpty(modulePath)) { + scriptingComponentHelper.setModules(modulePath.split(",")); + } else { + scriptingComponentHelper.setModules(new String[0]); + } + // Create a script engine for each possible task + scriptingComponentHelper.setup(1, getLogger()); + scriptToRun = scriptingComponentHelper.getScriptBody(); + + try { + String scriptPath = scriptingComponentHelper.getScriptPath(); + if (scriptToRun == null && scriptPath != null) { + try (final FileInputStream scriptStream = new FileInputStream(scriptPath)) { + scriptToRun = IOUtils.toString(scriptStream, Charset.defaultCharset()); + } + } + } catch (IOException ioe) { + throw new ProcessException(ioe); + } + + vmMetrics = VirtualMachineMetrics.getInstance(); + } + + @Override + public void onTrigger(final ReportingContext context) { + synchronized (scriptingComponentHelper.isInitialized) { + if (!scriptingComponentHelper.isInitialized.get()) { + scriptingComponentHelper.createResources(); + } + } + ScriptEngine scriptEngine = scriptingComponentHelper.engineQ.poll(); + ComponentLog log = getLogger(); + if (scriptEngine == null) { + // No engine available so nothing more to do here + return; + } + + try { + + try { + Bindings bindings = scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE); + if (bindings == null) { + bindings = new SimpleBindings(); + } + bindings.put("context", context); + bindings.put("log", log); + bindings.put("vmMetrics", vmMetrics); + + // Find the user-added properties and set them on the script + for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) { + if (property.getKey().isDynamic()) { + // Add the dynamic property bound to its full PropertyValue to the script engine + if (property.getValue() != null) { + bindings.put(property.getKey().getName(), context.getProperty(property.getKey())); + } + } + } + + scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE); + + // Execute any engine-specific configuration before the script is evaluated + ScriptEngineConfigurator configurator = + scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); + + // Evaluate the script with the configurator (if it exists) or the engine + if (configurator != null) { + configurator.eval(scriptEngine, scriptToRun, scriptingComponentHelper.getModules()); + } else { + scriptEngine.eval(scriptToRun); + } + } catch (ScriptException e) { + throw new ProcessException(e); + } + } catch (final Throwable t) { + // Mimic AbstractProcessor behavior here + getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t}); + throw t; + } finally { + scriptingComponentHelper.engineQ.offer(scriptEngine); + } + + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/675f4f54/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask new file mode 100644 index 0000000..06e1d4e --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.reporting.script.ScriptedReportingTask \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/675f4f54/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy index 86f119a..0302616 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy @@ -49,9 +49,9 @@ class ExecuteScriptGroovyTest extends BaseScriptTest { super.setupExecuteScript() runner.setValidateExpressionUsage(false) - runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy") - runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy") - runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy") + runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy") + runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy") + runner.setProperty(ScriptingComponentUtils.MODULES, TEST_RESOURCE_LOCATION + "groovy") } @After @@ -65,9 +65,9 @@ class ExecuteScriptGroovyTest extends BaseScriptTest { assertNotNull(executeScript.getSupportedPropertyDescriptors()) runner = TestRunners.newTestRunner(executeScript) runner.setValidateExpressionUsage(false) - runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy") - runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy") - runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy") + runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy") + runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy") + runner.setProperty(ScriptingComponentUtils.MODULES, TEST_RESOURCE_LOCATION + "groovy") // Override userContext value runner.processContext.maxConcurrentTasks = poolSize http://git-wip-us.apache.org/repos/asf/nifi/blob/675f4f54/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/reporting/script/ScriptedReportingTaskGroovyTest.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/reporting/script/ScriptedReportingTaskGroovyTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/reporting/script/ScriptedReportingTaskGroovyTest.groovy new file mode 100644 index 0000000..085c054 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/reporting/script/ScriptedReportingTaskGroovyTest.groovy @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.script + +import org.apache.commons.io.FileUtils +import org.apache.nifi.components.PropertyDescriptor +import org.apache.nifi.components.PropertyValue +import org.apache.nifi.controller.ConfigurationContext +import org.apache.nifi.logging.ComponentLog +import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper +import org.apache.nifi.processors.script.ScriptingComponentHelper +import org.apache.nifi.processors.script.ScriptingComponentUtils +import org.apache.nifi.provenance.ProvenanceEventBuilder +import org.apache.nifi.provenance.ProvenanceEventRecord +import org.apache.nifi.provenance.ProvenanceEventRepository +import org.apache.nifi.provenance.ProvenanceEventType +import org.apache.nifi.provenance.StandardProvenanceEventRecord +import org.apache.nifi.reporting.EventAccess +import org.apache.nifi.reporting.ReportingContext +import org.apache.nifi.reporting.ReportingInitializationContext +import org.apache.nifi.state.MockStateManager +import org.apache.nifi.util.MockFlowFile +import org.apache.nifi.util.MockPropertyValue +import org.apache.nifi.util.TestRunners +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.mockito.Mockito +import org.mockito.stubbing.Answer +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import static org.junit.Assert.assertEquals +import static org.junit.Assert.assertTrue +import static org.mockito.Mockito.any +import static org.mockito.Mockito.doAnswer +import static org.mockito.Mockito.mock +import static org.mockito.Mockito.when + + +/** + * Unit tests for ScriptedReportingTask. + */ +@RunWith(JUnit4.class) +class ScriptedReportingTaskGroovyTest { + private static final Logger logger = LoggerFactory.getLogger(ScriptedReportingTaskGroovyTest) + def task + def runner + def scriptingComponent + + + @BeforeClass + static void setUpOnce() throws Exception { + logger.metaClass.methodMissing = { String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + FileUtils.copyDirectory('src/test/resources' as File, 'target/test/resources' as File) + } + + @Before + void setUp() { + task = new MockScriptedReportingTask() + runner = TestRunners + scriptingComponent = (AccessibleScriptingComponentHelper) task + } + + @Test + void testProvenanceGroovyScript() { + def uuid = "10000000-0000-0000-0000-000000000000" + def attributes = ['abc': 'xyz', 'xyz': 'abc', 'filename': "file-$uuid", 'uuid': uuid] + def prevAttrs = ['filename': '1234.xyz'] + + def flowFile = new MockFlowFile(3L) + flowFile.putAttributes(attributes) + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.eventTime = System.currentTimeMillis() + builder.eventType = ProvenanceEventType.RECEIVE + builder.transitUri = 'nifi://unit-test' + builder.setAttributes(prevAttrs, attributes) + builder.componentId = '1234' + builder.componentType = 'dummy processor' + builder.fromFlowFile(flowFile) + final ProvenanceEventRecord event = builder.build() + + def properties = task.supportedPropertyDescriptors.collectEntries { descriptor -> + [descriptor: descriptor.getDefaultValue()] + } + + // Mock the ConfigurationContext for setup(...) + def configurationContext = mock(ConfigurationContext) + when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE)) + .thenReturn(new MockPropertyValue('Groovy')) + when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE)) + .thenReturn(new MockPropertyValue('target/test/resources/groovy/test_log_provenance_events.groovy')) + when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY)) + .thenReturn(new MockPropertyValue(null)) + when(configurationContext.getProperty(ScriptingComponentUtils.MODULES)) + .thenReturn(new MockPropertyValue(null)) + + // Set up ReportingContext + def context = mock(ReportingContext) + when(context.getStateManager()).thenReturn(new MockStateManager(task)) + doAnswer({ invocation -> + def descriptor = invocation.getArgumentAt(0, PropertyDescriptor) + return new MockPropertyValue(properties[descriptor]) + } as Answer<PropertyValue> + ).when(context).getProperty(any(PropertyDescriptor)) + + + def eventAccess = mock(EventAccess) + // Return 3 events for the test + doAnswer({ invocation -> return [event, event, event] } as Answer<List<ProvenanceEventRecord>> + ).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), Mockito.anyInt()) + + def provenanceRepository = mock(ProvenanceEventRepository.class) + doAnswer({ invocation -> return 3 } as Answer<Long> + ).when(provenanceRepository).getMaxEventId() + + when(context.getEventAccess()).thenReturn(eventAccess); + when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository) + + def logger = mock(ComponentLog) + def initContext = mock(ReportingInitializationContext) + when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()) + when(initContext.getLogger()).thenReturn(logger) + + task.initialize initContext + task.setup configurationContext + task.onTrigger context + + // This script should return a variable x with the number of events and a variable e with the first event + def se = task.scriptEngine + assertEquals 3, se.x + assertEquals '1234', se.e.componentId + assertEquals 'xyz', se.e.attributes.abc + task.offerScriptEngine(se) + + } + + @Test + void testVMEventsGroovyScript() { + + def properties = [:] as Map<PropertyDescriptor, String> + task.getSupportedPropertyDescriptors().each { PropertyDescriptor descriptor -> + properties.put(descriptor, descriptor.getDefaultValue()) + } + + // Mock the ConfigurationContext for setup(...) + def configurationContext = mock(ConfigurationContext) + when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE)) + .thenReturn(new MockPropertyValue('Groovy')) + when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE)) + .thenReturn(new MockPropertyValue('target/test/resources/groovy/test_log_vm_stats.groovy')) + when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY)) + .thenReturn(new MockPropertyValue(null)) + when(configurationContext.getProperty(ScriptingComponentUtils.MODULES)) + .thenReturn(new MockPropertyValue(null)) + + // Set up ReportingContext + def context = mock(ReportingContext) + when(context.getStateManager()).thenReturn(new MockStateManager(task)) + doAnswer({ invocation -> + PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor) + return new MockPropertyValue(properties[descriptor]) + } as Answer<PropertyValue> + ).when(context).getProperty(any(PropertyDescriptor)) + + + def logger = mock(ComponentLog) + def initContext = mock(ReportingInitializationContext) + when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()) + when(initContext.getLogger()).thenReturn(logger) + + task.initialize initContext + task.setup configurationContext + task.onTrigger context + def se = task.scriptEngine + // This script should store a variable called x with a map of stats to values + assertTrue se.x?.uptime > 0 + task.offerScriptEngine(se) + + } + + class MockScriptedReportingTask extends ScriptedReportingTask implements AccessibleScriptingComponentHelper { + def getScriptEngine() { + return scriptingComponentHelper.engineQ.poll() + } + + def offerScriptEngine(engine) { + scriptingComponentHelper.engineQ.offer(engine) + } + + @Override + ScriptingComponentHelper getScriptingComponentHelper() { + return this.@scriptingComponentHelper + } + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/675f4f54/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/AccessibleScriptingComponentHelper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/AccessibleScriptingComponentHelper.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/AccessibleScriptingComponentHelper.java new file mode 100644 index 0000000..5e3928e --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/AccessibleScriptingComponentHelper.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.script; + +/** + * An interface for retrieving the scripting component helper for a scripting processor. Aids in testing (for setting the Script Engine descriptor, for example). + */ +public interface AccessibleScriptingComponentHelper { + ScriptingComponentHelper getScriptingComponentHelper(); +}
