[
https://issues.apache.org/jira/browse/NIFI-210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15117433#comment-15117433
]
ASF GitHub Bot commented on NIFI-210:
-------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/185#discussion_r50852708
--- Diff:
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptProcessor.java
---
@@ -0,0 +1,862 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
+import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
+
+import javax.script.Invocable;
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineFactory;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+@Tags({"script", "invoke", "groovy", "python", "jython", "jruby", "ruby",
"javascript", "js", "lua", "luaj", "scala"})
+@CapabilityDescription("Invokes a script engine for a Processor defined in
the given script. The script must define "
++ "a valid class that implements the Processor interface, and it must set
a variable 'processor' to an instance of "
++ "the class. Processor methods such as onTrigger() will be delegated to
the scripted Processor instance. Also any "
++ "Relationships or PropertyDescriptors defined by the scripted processor
will be added to the configuration dialog.")
+@DynamicProperty(name = "A script engine property to update", value = "The
value to set it to", supportsExpressionLanguage = true,
+ description = "Updates a script engine property specified by the
Dynamic Property's key with the value specified by the Dynamic Property's
value")
+@SeeAlso({ExecuteScript.class})
+public class InvokeScriptProcessor extends AbstractSessionFactoryProcessor
{
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("FlowFiles that were successfully processed")
+ .build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles that were failed to process")
+ .build();
+
+
+ public static PropertyDescriptor SCRIPT_ENGINE;
+
+ public static final PropertyDescriptor SCRIPT_FILE = new
PropertyDescriptor.Builder()
+ .name("Script File")
+ .required(false)
+ .description("Path to script file to execute. Use either file
or body not both")
+ .addValidator(new StandardValidators.FileExistsValidator(true))
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor SCRIPT_BODY = new
PropertyDescriptor.Builder()
+ .name("Script Body")
+ .required(false)
+ .description("Body to script to execute. Use either file or
body not both")
+ .addValidator(Validator.VALID)
+ .expressionLanguageSupported(true)
+ .build();
+
+
+ public static final PropertyDescriptor SCRIPT_ARGS = new
PropertyDescriptor.Builder()
+ .name("Arguments")
+ .required(false)
+ .description("Arguments to pass to scripting engine")
+ .addValidator(Validator.VALID)
+ .expressionLanguageSupported(true)
+ .defaultValue("")
+ .build();
+
+ public static final PropertyDescriptor MODULES = new
PropertyDescriptor.Builder()
+ .name("Module Directory")
+ .description("Path to a directory which contains modules
required by the script.")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(new
StandardValidators.DirectoryExistsValidator(true, false))
+ .build();
+
+ // A map from engine name to a custom configurator for that engine
+ private final Map<String, ScriptEngineConfigurator>
scriptEngineConfiguratorMap = new ConcurrentHashMap<>();
+
+ private final AtomicReference<Processor> processor = new
AtomicReference<>();
+ private final AtomicReference<Collection<ValidationResult>>
validationResults =
+ new AtomicReference(Collections.EMPTY_LIST);
+
+ private ControllerServiceLookup controllerServiceLookup;
+ private String initializationContextId;
+
+ private final AtomicBoolean isInitialized = new AtomicBoolean(false);
+ private final Lock lock = new ReentrantLock();
+ private SynchronousFileWatcher scriptWatcher;
+
+ private Map<String, ScriptEngineFactory> scriptEngineFactoryMap;
+ private ScriptEngineManager scriptEngineManager;
+ private ScriptEngine scriptEngine;
+ private String scriptEngineName;
+ private String scriptPath;
+ private String modulePath;
+ private String scriptBody;
+
+ private ScheduledExecutorService reloadService = null;
+
+ private List<PropertyDescriptor> descriptors;
+
+ /**
+ * Initializes this processor
+ *
+ * @param context in which to perform initialization
+ */
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ initializationContextId = context.getIdentifier();
+ controllerServiceLookup = context.getControllerServiceLookup();
+ }
+
+
+ /**
+ * Creates the resources needed by this processor. An attempt is made
to also initialize the scripted processor,
+ * but unless the properties (such as script engine name and script
file path) have already been specified, the
+ * script will not yet have been evaluated, so the script's
initialize() method will not be called.
+ */
+ protected void createResources() {
+
+ // Set up script file reloader service. This checks to see if the
script file has changed, and if so, tries
+ // to reload it
+ if (reloadService == null) {
+ reloadService = Executors.newScheduledThreadPool(1);
+
+ // monitor the script if configured for changes
+ reloadService.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final boolean hasLock = lock.tryLock();
+
+ // if a property is changing we don't need to
reload this iteration
+ if (hasLock) {
+ try {
+ if (scriptWatcher != null &&
scriptWatcher.checkAndReset()) {
+ if (isFile(scriptPath)) {
+ // reload the actual script
+ final boolean reloaded =
reloadScriptFile(scriptPath);
+
+ // log the script was reloaded
+ if (reloaded) {
+ getLogger().info("The
configured script has been successfully reloaded.");
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ } catch (final Throwable t) {
+ final ProcessorLog logger = getLogger();
+ final String message = "Unable to reload
configured script Processor: " + t;
+
+ logger.error(message);
+ if (logger.isDebugEnabled()) {
+ logger.error(message, t);
+ }
+ }
+ }
+ }, 30, 10, TimeUnit.SECONDS);
+ }
+
+ descriptors = new ArrayList<>();
+
+ // The following is required for JRuby, should be transparent to
everything else
+ System.setProperty("org.jruby.embed.localvariable.behavior",
"persistent");
+
+ // Create list of available engines
+ scriptEngineManager = new ScriptEngineManager();
+ List<ScriptEngineFactory> scriptEngineFactories =
scriptEngineManager.getEngineFactories();
+ if (scriptEngineFactories != null) {
+ scriptEngineFactoryMap = new HashMap<>();
+ List<AllowableValue> engineList = new ArrayList<>();
+ for (ScriptEngineFactory factory : scriptEngineFactories) {
+ ScriptEngine engine = factory.getScriptEngine();
+ if (engine instanceof Invocable) {
+ engineList.add(new
AllowableValue(factory.getLanguageName()));
+ scriptEngineFactoryMap.put(factory.getLanguageName(),
factory);
+ }
+ }
+ // Sort the list by name so the list always looks the same.
+ Collections.sort(engineList, new Comparator<AllowableValue>() {
+ @Override
+ public int compare(AllowableValue o1, AllowableValue o2) {
+ if (o1 == null) {
+ return o2 == null ? 0 : 1;
+ }
+ if (o2 == null) {
+ return -1;
+ }
+ return o1.getValue().compareTo(o2.getValue());
+ }
+ });
+
+ AllowableValue[] engines = engineList.toArray(new
AllowableValue[engineList.size()]);
+ SCRIPT_ENGINE = new PropertyDescriptor.Builder()
+ .name("Script Engine")
+ .required(true)
+ .description("The engine to execute the scripted
Processor")
+ .allowableValues(engines)
+ .defaultValue(engines[0].getValue())
+ .required(true)
+ .expressionLanguageSupported(false)
+ .build();
+ descriptors.add(SCRIPT_ENGINE);
+ }
+
+ descriptors.add(SCRIPT_FILE);
+ descriptors.add(SCRIPT_BODY);
+ descriptors.add(SCRIPT_ARGS);
+ descriptors.add(MODULES);
+
+ isInitialized.set(true);
+ }
+
+ /**
+ * Returns the valid relationships for this processor. SUCCESS and
FAILURE are always returned, and if the script
+ * processor has defined additional relationships, those will be added
as well.
+ *
+ * @return a Set of Relationships supported by this processor
+ */
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>();
+ final Processor instance = processor.get();
+ if (instance != null) {
+ try {
+ relationships.addAll(instance.getRelationships());
+ } catch (final Throwable t) {
+ final ProcessorLog logger = getLogger();
+ final String message = "Unable to get relationships from
scripted Processor: " + t;
+
+ logger.error(message);
+ if (logger.isDebugEnabled()) {
+ logger.error(message, t);
+ }
+ }
+ } else {
+ // Return defaults for now
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ }
+ return Collections.unmodifiableSet(relationships);
+ }
+
+ /**
+ * Returns a list of property descriptors supported by this processor.
The list always includes properties such as
+ * script engine name, script file name, script body name, script
arguments, and an external module path. If the
+ * scripted processor also defines supported properties, those are
added to the list as well.
+ *
+ * @return a List of PropertyDescriptor objects supported by this
processor
+ */
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+
+ synchronized (isInitialized) {
+ if (!isInitialized.get()) {
+ createResources();
+ }
+ }
+ List<PropertyDescriptor> supportedPropertyDescriptors = new
ArrayList<>();
+ supportedPropertyDescriptors.addAll(descriptors);
+
+ final Processor instance = processor.get();
+ if (instance != null) {
+ try {
+ final List<PropertyDescriptor> instanceDescriptors =
instance.getPropertyDescriptors();
+ if (instanceDescriptors != null) {
+
supportedPropertyDescriptors.addAll(instanceDescriptors);
+ }
+ } catch (final Throwable t) {
+ final ProcessorLog logger = getLogger();
+ final String message = "Unable to get property descriptors
from Processor: " + t;
+
+ logger.error(message);
+ if (logger.isDebugEnabled()) {
+ logger.error(message, t);
+ }
+ }
+ }
+
+ return Collections.unmodifiableList(supportedPropertyDescriptors);
+ }
+
+ /**
+ * Returns a PropertyDescriptor for the given name. This is for the
user to be able to define their own properties
+ * which will be available as variables in the script
+ *
+ * @param propertyDescriptorName used to lookup if any property
descriptors exist for that name
+ * @return a PropertyDescriptor object corresponding to the specified
dynamic property name
+ */
+ @Override
+ protected PropertyDescriptor
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
true))
+
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .dynamic(true)
+ .build();
+ }
+
+ /**
+ * Determines whether the given path refers to a valid file
+ *
+ * @param path a path to a file
+ * @return true if the path refers to a valid file, false otherwise
+ */
+ private boolean isFile(final String path) {
+ return path != null && Files.isRegularFile(Paths.get(path));
+ }
+
+ /**
+ * Performs setup operations when the processor is scheduled to run.
This includes evaluating the processor's
+ * properties, as well as reloading the script (from file or the
"Script Body" property)
+ *
+ * @param context the context in which to perform the setup operations
+ */
+ @OnScheduled
+ public void setup(final ProcessContext context) {
+ scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue();
+ scriptPath = context.getProperty(SCRIPT_FILE).getValue();
+ scriptBody = context.getProperty(SCRIPT_BODY).getValue();
+ modulePath = context.getProperty(MODULES).getValue();
+ setupEngine();
+ if (processor.get() == null) {
+ if (isFile(scriptPath)) {
+ reloadScriptFile(scriptPath);
+ } else {
+ reloadScriptBody(scriptBody);
+ }
+ }
+ }
+
+ /**
+ * Configures the specified script engine. First, the engine is loaded
and instantiated using the JSR-223
+ * javax.script APIs. Then, if any script configurators have been
defined for this engine, their init() method is
+ * called, and the configurator is saved for future calls.
+ *
+ * @see org.apache.nifi.processors.script.ScriptEngineConfigurator
+ */
+ private void setupEngine() {
+ ClassLoader originalContextClassLoader =
Thread.currentThread().getContextClassLoader();
+ try {
+ ProcessorLog log = getLogger();
+
+ // Need the right classloader when the engine is created. This
ensures the NAR's execution class loader
+ // (plus the module path) becomes the parent for the script
engine
+ ClassLoader scriptEngineModuleClassLoader =
createScriptEngineModuleClassLoader(modulePath);
+ if (scriptEngineModuleClassLoader != null) {
+
Thread.currentThread().setContextClassLoader(scriptEngineModuleClassLoader);
+ }
+ scriptEngine = getScriptEngine();
+ ServiceLoader<ScriptEngineConfigurator>
configuratorServiceLoader =
+ ServiceLoader.load(ScriptEngineConfigurator.class);
+ for (ScriptEngineConfigurator configurator :
configuratorServiceLoader) {
+ String configuratorScriptEngineName =
configurator.getScriptEngineName();
+ try {
+ if (configuratorScriptEngineName != null
+ &&
configuratorScriptEngineName.equalsIgnoreCase(scriptEngineName)) {
+ configurator.init(scriptEngine, modulePath);
+
scriptEngineConfiguratorMap.put(configurator.getScriptEngineName(),
configurator);
+ }
+ } catch (ScriptException se) {
+ log.error("Error initializing script engine
configurator {}",
+ new Object[]{configuratorScriptEngineName});
+ if (log.isDebugEnabled()) {
+ log.error("Error initializing script engine
configurator", se);
+ }
+ }
+ }
+
+ } finally {
+ // Restore original context class loader
+
Thread.currentThread().setContextClassLoader(originalContextClassLoader);
+ }
+ }
+
+ /**
+ * Provides a ScriptEngine corresponding to the currently selected
script engine name.
+ * ScriptEngineManager.getEngineByName() doesn't use find
ScriptEngineFactory.getName(), which
+ * is what we used to populate the list. So just search the list of
factories until a match is
+ * found, then create and return a script engine.
+ *
+ * @return a Script Engine corresponding to the currently specified
name, or null if none is found.
+ */
+ private ScriptEngine getScriptEngine() {
+ //
+ ScriptEngineFactory factory =
scriptEngineFactoryMap.get(scriptEngineName);
+ if (factory == null) {
+ return null;
+ }
+ return factory.getScriptEngine();
+ }
+
+ /**
+ * Handles changes to this processor's properties. If changes are made
to script- or engine-related properties,
+ * the script will be reloaded.
+ *
+ * @param descriptor of the modified property
+ * @param oldValue non-null property value (previous)
+ * @param newValue the new property value or if null indicates the
property
+ */
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor,
final String oldValue, final String newValue) {
+ final ProcessorLog logger = getLogger();
+ final Processor instance = processor.get();
+
+ if (SCRIPT_FILE.equals(descriptor)
+ || SCRIPT_BODY.equals(descriptor)
+ || MODULES.equals(descriptor)
+ || SCRIPT_ENGINE.equals(descriptor)) {
+ lock.lock();
+ try {
+ // if the script is changing we'll want to reload the
instance
+ if (SCRIPT_FILE.equals(descriptor)) {
+ if (isFile(newValue)) {
+ reloadScriptFile(newValue);
+
+ // we're attempted to load the script so we need
to watch for updates
+ scriptWatcher = new
SynchronousFileWatcher(Paths.get(newValue), new LastModifiedMonitor());
+ } else {
+ // the doesn't appear to be a file
+ scriptWatcher = null;
+ }
+
+ // always want to record the configured value
+ scriptPath = newValue;
+ } else if (SCRIPT_BODY.equals(descriptor)) {
+
+ if (reloadScriptBody(newValue)) {
+ // always want to record the configured value
+ scriptBody = newValue;
+ }
+ } else if (MODULES.equals(descriptor)) {
+
+ // temporarily set new value (will be restored to
oldValue if something goes wrong)
+ modulePath = newValue;
+ try {
+ setupEngine();
+
+ boolean reloaded = false;
+
+ // we only want to reload during a module change
if the script is already loaded
+ if (scriptPath != null || scriptBody != null) {
+ if (isFile(scriptPath)) {
+ // reload the script
+ reloaded = reloadScriptFile(scriptPath);
+ } else if (scriptBody != null) {
+ reloaded = reloadScriptBody(scriptBody);
+ }
+ // log the script was reloaded
+ if (reloaded) {
+ logger.info("The configured script has
been successfully reloaded.");
+ } else {
+ throw new ProcessException("The configured
script could not be reloaded");
+ }
+ }
+ } catch (Throwable t) {
+ modulePath = oldValue;
+ logger.error(t.getLocalizedMessage(), t);
+ }
+
+ } else if (SCRIPT_ENGINE.equals(descriptor)) {
+ // The script engine has changed, so we need to set up
a new instance for the selected
+ // engine name
+ scriptEngineName = newValue;
+ setupEngine();
+ }
+ } finally {
+ lock.unlock();
+ }
+ } else if (instance != null) {
+ // If the script provides a Processor, call its
onPropertyModified() method
+ try {
+ instance.onPropertyModified(descriptor, oldValue,
newValue);
+ } catch (final Throwable t) {
+ final String message = "Unable to invoke
onPropertyModified from script Processor: " + t;
+
+ logger.error(message);
+ if (logger.isDebugEnabled()) {
+ logger.error(message, t);
+ }
+ }
+ }
+ }
+
+ /**
+ * Creates a classloader to be used by the selected script engine and
the provided script file. This
+ * classloader has the InvokeScriptProcessor's classloader as a parent
(versus the current thread's context
+ * classloader) and also adds the specified module directory to the
classpath. This enables scripts
+ * to use other scripts, modules, etc. without having to build them
into the InvokeScriptProcessor NAR.
+ * If the parameter is null or empty, InvokeScriptProcessor's
classloader is returned
+ *
+ * @param modulePath The path to a directory containing modules to be
used by the script(s)
+ */
+ private ClassLoader createScriptEngineModuleClassLoader(String
modulePath) {
+ URLClassLoader newModuleClassLoader = null;
+ if (StringUtils.isEmpty(modulePath)) {
+ return InvokeScriptProcessor.class.getClassLoader();
+ }
+ try {
+ newModuleClassLoader =
+ new URLClassLoader(
+ new URL[]{new
File(modulePath).toURI().toURL()},
+ InvokeScriptProcessor.class.getClassLoader());
+ } catch (MalformedURLException mue) {
+ getLogger().error("Couldn't find modules directory at " +
modulePath, mue);
+ }
+ return newModuleClassLoader;
+ }
+
+
+ /**
+ * Reloads the script located at the given path
+ *
+ * @param scriptPath the path to the script file to be loaded
+ * @return true if the script was loaded successfully; false otherwise
+ */
+ private boolean reloadScriptFile(final String scriptPath) {
+ final Collection<ValidationResult> results = new HashSet<>();
+
+ try (final FileInputStream scriptStream = new
FileInputStream(scriptPath)) {
+ return reloadScriptFromReader(scriptStream);
+
+ } catch (final Throwable t) {
+ final ProcessorLog logger = getLogger();
+ final String message = "Unable to load script: " + t;
+
+ // If the module path has not yet been set, then this script
is likely being loaded too early and depends
+ // on modules the processor does not yet know about. If this
is the case, it will be reloaded later on
+ // property change (modules) or when scheduled
+ if (modulePath != null) {
+ logger.error(message);
+ if (logger.isDebugEnabled()) {
+ logger.error(message, t);
+ }
+
+ results.add(new ValidationResult.Builder()
+ .subject("ScriptValidation")
+ .valid(false)
+ .explanation("Unable to load script due to " + t)
+ .input(scriptPath)
+ .build());
+ }
+ }
+
+ // store the updated validation results
+ validationResults.set(results);
+
+ // return whether there was any issues loading the configured
script
+ return results.isEmpty();
+ }
+
+ /**
+ * Reloads the script defined by the given string
+ *
+ * @param scriptBody the contents of the script to be loaded
+ * @return true if the script was loaded successfully; false otherwise
+ */
+ private boolean reloadScriptBody(final String scriptBody) {
+ final Collection<ValidationResult> results = new HashSet<>();
+ try (final ByteArrayInputStream scriptStream = new
ByteArrayInputStream(scriptBody.getBytes("UTF-8"))) {
+ return reloadScriptFromReader(scriptStream);
+
+ } catch (final Throwable t) {
+ final ProcessorLog logger = getLogger();
+ final String message = "Unable to load script: " + t;
+
+ // If the module path has not yet been set, then this script
is likely being loaded too early and depends
+ // on modules the processor does not yet know about. If this
is the case, it will be reloaded later on
+ // property change (modules) or when scheduled
+ if (modulePath != null) {
+ logger.error(message);
+ if (logger.isDebugEnabled()) {
+ logger.error(message, t);
+ }
+
+ results.add(new ValidationResult.Builder()
+ .subject("ScriptValidation")
+ .valid(false)
+ .explanation("Unable to load script due to " + t)
+ .input(scriptPath)
+ .build());
+ }
+ }
+
+ // store the updated validation results
+ validationResults.set(results);
+
+ // return whether there was any issues loading the configured
script
+ return results.isEmpty();
+ }
+
+ /**
+ * Reloads the script Processor. This must be called within the lock.
+ *
+ * @param scriptStream An input stream associated with the script
content
+ * @returns Whether the script was successfully reloaded
+ */
+ private boolean reloadScriptFromReader(final InputStream scriptStream)
{
+ // note we are starting here with a fresh listing of validation
+ // results since we are (re)loading a new/updated script. any
+ // existing validation results are not relevant
+ final Collection<ValidationResult> results = new HashSet<>();
+
+ try {
+ // get the engine and ensure its invocable
+ if (scriptEngine instanceof Invocable) {
+ final Invocable invocable = (Invocable) scriptEngine;
+
+ // Find a custom configurator and invoke their eval()
method
+ ScriptEngineConfigurator configurator =
+ scriptEngineConfiguratorMap.get(scriptEngineName);
+ if (configurator != null) {
+ configurator.eval(scriptEngine, scriptStream,
modulePath);
+ } else {
+ // evaluate the script
+ scriptEngine.eval(new BufferedReader(new
InputStreamReader(scriptStream)));
+ }
+
+ // get configured processor from the script (if it exists)
+ final Object obj = scriptEngine.get("processor");
+ final ProcessorLog logger = getLogger();
+
+ try {
+ // set the logger if the processor wants it
+ invocable.invokeMethod(obj, "setLogger", logger);
+ } catch (final NoSuchMethodException nsme) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Configured script Processor does not
contain a setLogger method.");
+ }
+ }
+
+ // record the processor for use later
+ final Processor scriptProcessor =
invocable.getInterface(obj, Processor.class);
+ processor.set(scriptProcessor);
+
+ if (scriptProcessor != null) {
+ try {
+ scriptProcessor.initialize(new
ProcessorInitializationContext() {
+ @Override
+ public String getIdentifier() {
+ return initializationContextId;
+ }
+
+ @Override
+ public ProcessorLog getLogger() {
+ return logger;
+ }
+
+ @Override
+ public ControllerServiceLookup
getControllerServiceLookup() {
+ return controllerServiceLookup;
+ }
+ });
+ } catch (final Throwable t) {
+ final String message = "Unable to initialize
scripted Processor: " + t;
+
+ logger.error(message);
+ if (logger.isDebugEnabled()) {
+ logger.error(message, t);
+ }
+ }
+ }
+ }
+
+ } catch (final Throwable t) {
+ final ProcessorLog logger = getLogger();
+ final String message = "Unable to load script: " + t;
+
+ // If the module path has not yet been set, then this script
is likely being loaded too early and depends
+ // on modules the processor does not yet know about. If this
is the case, it will be reloaded later on
+ // property change (modules) or when scheduled
+ //
+ // Alternatively, if the module is
+ if (modulePath != null) {
+ logger.error(message);
+ if (logger.isDebugEnabled()) {
+ logger.error(message, t);
+ }
+
+ results.add(new ValidationResult.Builder()
+ .subject("ScriptValidation")
+ .valid(false)
+ .explanation("Unable to load script due to " + t)
+ .input(scriptPath)
+ .build());
+ }
+ }
+
+ // store the updated validation results
+ validationResults.set(results);
+
+ // return whether there was any issues loading the configured
script
+ return results.isEmpty();
+ }
+
+ /**
+ * Invokes the validate() routine provided by the script, allowing for
custom validation code.
+ * This method assumes there is a valid Processor defined in the
script and it has been loaded
+ * by the InvokeScriptProcessor processor
+ *
+ * @param context The validation context to be passed into the custom
validate method
+ * @return A collection of ValidationResults returned by the custom
validate method
+ */
+ @Override
+ protected Collection<ValidationResult> customValidate(final
ValidationContext context) {
+ final Processor instance = processor.get();
+ final Collection<ValidationResult> currentValidationResults =
validationResults.get();
+
+ // if there was existing validation errors and the processor
loaded successfully
+ if (currentValidationResults.isEmpty() && instance != null) {
+ try {
+ // defer to the underlying processor for validation
+ final Collection<ValidationResult> instanceResults =
instance.validate(context);
+ if (instanceResults != null && instanceResults.size() > 0)
{
+ // return the validation results from the underlying
instance
+ return instanceResults;
+ }
+ } catch (final Throwable t) {
+ final ProcessorLog logger = getLogger();
+ final String message = "Unable to validate the script
Processor: " + t;
+
+ logger.error(message);
--- End diff --
Would just pass the Throwable to the logger here instead of doing it in
isDebugEnabled, since the logger will make that check itself - though we should
probably catch Exception instead of Throwable?
> Provide an ExecuteScript processor
> ----------------------------------
>
> Key: NIFI-210
> URL: https://issues.apache.org/jira/browse/NIFI-210
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Affects Versions: 0.0.1
> Reporter: A. Steven Anderson
> Assignee: Matt Burgess
> Labels: processor, scala
> Fix For: 0.5.0
>
> Attachments:
> 0001-NIFI-210-few-tweaks-to-drop-static-reference-and-fix.patch
>
>
> Add latest Scala version support for ExcecuteScript processor.
> Should also support Clojure as per discussion and request on mailing list
> http://mail-archives.apache.org/mod_mbox/nifi-dev/201506.mbox/%3CCAMpSqch4GK1gnw6M1u8tH6AN8e_miXZN5SNkAeMjBujXYGqJiw%40mail.gmail.com%3E
> UPDATE: The ScriptEngine for Clojure is not being maintained and is not
> currently available via Maven Central or a public repository. Recommend
> adding Clojure as a separate Improvement Jira case.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)