This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 3e774bc NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues (#5116) 3e774bc is described below commit 3e774bc5beadd62a646794cc659db9902c5ab1fa Author: Matthew Burgess <mattyb...@apache.org> AuthorDate: Fri Jun 11 14:44:55 2021 -0400 NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues (#5116) NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues --- .../lookup/script/BaseScriptedLookupService.java | 116 +++++++++---------- .../nifi/processors/script/ExecuteScript.java | 77 +++---------- .../processors/script/InvokeScriptedProcessor.java | 83 +++++++------- ...ptEngineConfigurator.java => ScriptRunner.java} | 24 ++-- .../processors/script/ScriptedTransformRecord.java | 16 +-- .../script/AbstractScriptedRecordFactory.java | 10 -- .../apache/nifi/record/script/ScriptedReader.java | 28 +++-- .../record/script/ScriptedRecordSetWriter.java | 29 +++-- .../record/sink/script/ScriptedRecordSink.java | 75 ++++++------ .../reporting/script/ScriptedReportingTask.java | 41 ++----- .../rules/engine/script/ScriptedRulesEngine.java | 73 ++++++------ .../handlers/script/ScriptedActionHandler.java | 114 ++++++++++--------- .../script/AbstractScriptedControllerService.java | 9 +- .../apache/nifi/script/ScriptRunnerFactory.java | 126 +++++++++++++++++++++ .../nifi/script/ScriptingComponentHelper.java | 112 +++++++----------- .../AbstractModuleClassloaderConfigurator.java | 88 -------------- ...gineConfigurator.java => BaseScriptRunner.java} | 27 ++--- ...eConfigurator.java => ClojureScriptRunner.java} | 35 +++--- ...eConfigurator.java => GenericScriptRunner.java} | 22 ++-- ...neConfigurator.java => GroovyScriptRunner.java} | 24 ++-- ...nfigurator.java => JavascriptScriptRunner.java} | 19 ++-- .../impl/JythonScriptEngineConfigurator.java | 81 ------------- .../nifi/script/impl/JythonScriptRunner.java | 62 ++++++++++ ...nifi.processors.script.ScriptEngineConfigurator | 8 +- .../script/ScriptedReportingTaskTest.groovy | 23 ++-- .../src/test/resources/groovy/test_reader.groovy | 12 +- 26 files changed, 619 insertions(+), 715 deletions(-) diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/BaseScriptedLookupService.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/BaseScriptedLookupService.java index ab2cc47..9c7a4b4 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/BaseScriptedLookupService.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/BaseScriptedLookupService.java @@ -32,12 +32,13 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.lookup.LookupService; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.script.ScriptEngineConfigurator; import org.apache.nifi.script.AbstractScriptedControllerService; import org.apache.nifi.script.ScriptingComponentHelper; import org.apache.nifi.script.ScriptingComponentUtils; import javax.script.Invocable; +import javax.script.ScriptContext; +import javax.script.ScriptEngine; import javax.script.ScriptException; import java.io.File; import java.util.ArrayList; @@ -155,10 +156,6 @@ public class BaseScriptedLookupService extends AbstractScriptedControllerService || ScriptingComponentUtils.MODULES.equals(descriptor) || scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) { scriptNeedsReload.set(true); - // Need to reset scriptEngine if the value has changed - if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) { - scriptEngine = null; - } } else if (instance != null) { // If the script provides a ConfigurableComponent, call its onPropertyModified() method try { @@ -181,67 +178,67 @@ public class BaseScriptedLookupService extends AbstractScriptedControllerService super.onEnabled(context); // Call an non-interface method onEnabled(context), to allow a scripted LookupService the chance to set up as necessary - final Invocable invocable = (Invocable) scriptEngine; - if (configurationContext != null) { - try { - // Get the actual object from the script engine, versus the proxy stored in lookupService. The object may have additional methods, - // where lookupService is a proxied interface - final Object obj = scriptEngine.get("lookupService"); - if (obj != null) { - try { - invocable.invokeMethod(obj, "onEnabled", context); - } catch (final NoSuchMethodException nsme) { - if (getLogger().isDebugEnabled()) { - getLogger().debug("Configured script LookupService does not contain an onEnabled() method."); + if (scriptRunner != null) { + final ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); + final Invocable invocable = (Invocable) scriptEngine; + if (configurationContext != null) { + try { + // Get the actual object from the script engine, versus the proxy stored in lookupService. The object may have additional methods, + // where lookupService is a proxied interface + final Object obj = scriptEngine.get("lookupService"); + if (obj != null) { + try { + invocable.invokeMethod(obj, "onEnabled", context); + } catch (final NoSuchMethodException nsme) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Configured script LookupService does not contain an onEnabled() method."); + } } + } else { + throw new ScriptException("No LookupService was defined by the script."); } - } else { - throw new ScriptException("No LookupService was defined by the script."); + } catch (ScriptException se) { + throw new ProcessException("Error executing onEnabled(context) method", se); } - } catch (ScriptException se) { - throw new ProcessException("Error executing onEnabled(context) method", se); } + } else { + throw new ProcessException("Error creating ScriptRunner"); } } @OnDisabled public void onDisabled(final ConfigurationContext context) { // Call an non-interface method onDisabled(context), to allow a scripted LookupService the chance to shut down as necessary - final Invocable invocable = (Invocable) scriptEngine; - if (configurationContext != null) { - try { - // Get the actual object from the script engine, versus the proxy stored in lookupService. The object may have additional methods, - // where lookupService is a proxied interface - final Object obj = scriptEngine.get("lookupService"); - if (obj != null) { - try { - invocable.invokeMethod(obj, "onDisabled", context); - } catch (final NoSuchMethodException nsme) { - if (getLogger().isDebugEnabled()) { - getLogger().debug("Configured script LookupService does not contain an onDisabled() method."); + if (scriptRunner != null) { + final ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); + final Invocable invocable = (Invocable) scriptEngine; + if (configurationContext != null) { + try { + // Get the actual object from the script engine, versus the proxy stored in lookupService. The object may have additional methods, + // where lookupService is a proxied interface + final Object obj = scriptRunner.getScriptEngine().get("lookupService"); + if (obj != null) { + try { + invocable.invokeMethod(obj, "onDisabled", context); + } catch (final NoSuchMethodException nsme) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Configured script LookupService does not contain an onDisabled() method."); + } } + } else { + throw new ScriptException("No LookupService was defined by the script."); } - } else { - throw new ScriptException("No LookupService was defined by the script."); + } catch (ScriptException se) { + throw new ProcessException("Error executing onDisabled(context) method", se); } - } catch (ScriptException se) { - throw new ProcessException("Error executing onDisabled(context) method", se); } + } else { + throw new ProcessException("Error creating ScriptRunner"); } } @Override public void setup() { - // Create a single script engine, the Processor object is reused by each task - if (scriptEngine == null) { - scriptingComponentHelper.setup(1, getLogger()); - scriptEngine = scriptingComponentHelper.engineQ.poll(); - } - - if (scriptEngine == null) { - throw new ProcessException("No script engine available!"); - } - if (scriptNeedsReload.get() || lookupService.get() == null) { if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) { reloadScriptFile(scriptingComponentHelper.getScriptPath()); @@ -266,23 +263,26 @@ public class BaseScriptedLookupService extends AbstractScriptedControllerService final Collection<ValidationResult> results = new HashSet<>(); try { + // Create a single script engine, the Processor object is reused by each task + if (scriptRunner == null) { + scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger()); + scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll(); + } + + if (scriptRunner == null) { + throw new ProcessException("No script runner available!"); + } + // get the engine and ensure its invocable + ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); if (scriptEngine instanceof Invocable) { final Invocable invocable = (Invocable) scriptEngine; - // Find a custom configurator and invoke their eval() method - ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); - if (configurator != null) { - configurator.reset(); - configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); - configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); - } else { - // evaluate the script - scriptEngine.eval(scriptBody); - } + // evaluate the script + scriptRunner.run(scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE)); // get configured LookupService from the script (if it exists) - final Object obj = scriptEngine.get("lookupService"); + final Object obj = scriptRunner.getScriptEngine().get("lookupService"); if (obj != null) { final ComponentLog logger = getLogger(); diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java index f3b5d92..8d64dcf 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java @@ -52,7 +52,6 @@ import org.apache.nifi.search.Searchable; import javax.script.Bindings; import javax.script.ScriptContext; import javax.script.ScriptEngine; -import javax.script.ScriptException; import javax.script.SimpleBindings; import java.io.FileInputStream; import java.io.IOException; @@ -120,7 +119,7 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { synchronized (scriptingComponentHelper.isInitialized) { if (!scriptingComponentHelper.isInitialized.get()) { - scriptingComponentHelper.createResources(); + scriptingComponentHelper.createResources(false); } } @@ -150,35 +149,6 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se return scriptingComponentHelper.customValidate(validationContext); } - - /** - * Handles changes to this processor's properties. If changes are made to - * script- or engine-related properties, the script will be reloaded. - * - * @param descriptor of the modified property - * @param oldValue non-null property value (previous) - * @param newValue the new property value or if null indicates the property - */ - @Override - public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - - if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor) - || ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor) - || ScriptingComponentUtils.MODULES.equals(descriptor) - || scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) { - - // Reset the configurator on change, this can indicate to the configurator to recompile the script on next init() - String scriptEngineName = scriptingComponentHelper.getScriptEngineName(); - if (scriptEngineName != null) { - ScriptEngineConfigurator configurator = - scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase()); - if (configurator != null) { - configurator.reset(); - } - } - } - } - /** * Performs setup operations when the processor is scheduled to run. This includes evaluating the processor's * properties, as well as reloading the script (from file or the "Script Body" property) @@ -189,9 +159,6 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se public void setup(final ProcessContext context) { scriptingComponentHelper.setupVariables(context); - // Create a script engine for each possible task - int maxTasks = context.getMaxConcurrentTasks(); - scriptingComponentHelper.setup(maxTasks, getLogger()); scriptToRun = scriptingComponentHelper.getScriptBody(); try { @@ -203,6 +170,10 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se } catch (IOException ioe) { throw new ProcessException(ioe); } + + // Create a script engine for each possible task + int maxTasks = context.getMaxConcurrentTasks(); + scriptingComponentHelper.setupScriptRunners(maxTasks, scriptToRun, getLogger()); } /** @@ -220,17 +191,18 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { synchronized (scriptingComponentHelper.isInitialized) { if (!scriptingComponentHelper.isInitialized.get()) { - scriptingComponentHelper.createResources(); + scriptingComponentHelper.createResources(false); } } - ScriptEngine scriptEngine = scriptingComponentHelper.engineQ.poll(); + ScriptRunner scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll(); ComponentLog log = getLogger(); - if (scriptEngine == null) { + if (scriptRunner == null) { // No engine available so nothing more to do here return; } ProcessSession session = sessionFactory.createSession(); try { + ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); try { Bindings bindings = scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE); @@ -253,31 +225,16 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se } } - scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE); - - // Execute any engine-specific configuration before the script is evaluated - ScriptEngineConfigurator configurator = - scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); - - // Evaluate the script with the configurator (if it exists) or the engine - if (configurator != null) { - configurator.init(scriptEngine, scriptToRun, scriptingComponentHelper.getModules()); - configurator.eval(scriptEngine, scriptToRun, scriptingComponentHelper.getModules()); - } else { - scriptEngine.eval(scriptToRun); - } + scriptRunner.run(bindings); // Commit this session for the user. This plus the outermost catch statement mimics the behavior // of AbstractProcessor. This class doesn't extend AbstractProcessor in order to share a base // class with InvokeScriptedProcessor session.commitAsync(); - } catch (ScriptException e) { - // Reset the configurator on error, this can indicate to the configurator to recompile the script on next init() - ScriptEngineConfigurator configurator = - scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); - if (configurator != null) { - configurator.reset(); - } + scriptingComponentHelper.scriptRunnerQ.offer(scriptRunner); + } catch (Throwable t) { + // Create a new ScriptRunner to replace the one that caused an exception + scriptingComponentHelper.setupScriptRunners(false, 1, scriptToRun, getLogger()); // The below 'session.rollback(true)' reverts any changes made during this session (all FlowFiles are // restored back to their initial session state and back to their original queues after being penalized). @@ -285,7 +242,7 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se // cause resource exhaustion. In case a user does not want to yield, it can be set to 0s in the processor // configuration. context.yield(); - throw new ProcessException(e); + throw new ProcessException(t); } } catch (final Throwable t) { // Mimic AbstractProcessor behavior here @@ -295,8 +252,6 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se // the flow file from the session binding (ff = session.get()). session.rollback(true); throw t; - } finally { - scriptingComponentHelper.engineQ.offer(scriptEngine); } } @@ -310,7 +265,7 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se // Create the resources whether or not they have been created already, this method is guaranteed to have the instance classloader set // as the thread context class loader. Other methods that call createResources() may be called from other threads with different // classloaders - scriptingComponentHelper.createResources(); + scriptingComponentHelper.createResources(false); } @Override diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java index 366f8fa..681c798 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java @@ -51,6 +51,7 @@ import org.apache.nifi.script.ScriptingComponentUtils; import org.apache.nifi.script.impl.FilteredPropertiesValidationContextAdapter; import javax.script.Invocable; +import javax.script.ScriptContext; import javax.script.ScriptEngine; import javax.script.ScriptException; import java.io.File; @@ -93,7 +94,7 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor { private final AtomicBoolean scriptNeedsReload = new AtomicBoolean(true); - private volatile ScriptEngine scriptEngine = null; + private volatile ScriptRunner scriptRunner = null; private volatile String kerberosServicePrincipal = null; private volatile File kerberosConfigFile = null; private volatile File kerberosServiceKeytab = null; @@ -152,8 +153,7 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor { scriptingComponentHelper.createResources(); } } - List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>(); - supportedPropertyDescriptors.addAll(scriptingComponentHelper.getDescriptors()); + List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>(scriptingComponentHelper.getDescriptors()); final Processor instance = processor.get(); if (instance != null) { @@ -213,16 +213,6 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor { } public void setup() { - // Create a single script engine, the Processor object is reused by each task - if(scriptEngine == null) { - scriptingComponentHelper.setup(1, getLogger()); - scriptEngine = scriptingComponentHelper.engineQ.poll(); - } - - if (scriptEngine == null) { - throw new ProcessException("No script engine available!"); - } - if (scriptNeedsReload.get() || processor.get() == null) { if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) { reloadScriptFile(scriptingComponentHelper.getScriptPath()); @@ -254,7 +244,7 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor { || ScriptingComponentUtils.MODULES.equals(descriptor) || scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) { scriptNeedsReload.set(true); - scriptEngine = null; //reset engine. This happens only when a processor is stopped, so there won't be any performance impact in run-time. + scriptRunner = null; //reset engine. This happens only when a processor is stopped, so there won't be any performance impact in run-time. } else if (instance != null) { // If the script provides a Processor, call its onPropertyModified() method try { @@ -342,20 +332,22 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor { final Collection<ValidationResult> results = new HashSet<>(); try { + // Create a single script engine, the Processor object is reused by each task + if (scriptRunner == null) { + scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger()); + scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll(); + } + + if (scriptRunner == null) { + throw new ProcessException("No script runner available!"); + } // get the engine and ensure its invocable + ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); if (scriptEngine instanceof Invocable) { final Invocable invocable = (Invocable) scriptEngine; - // Find a custom configurator and invoke their eval() method - ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); - if (configurator != null) { - configurator.reset(); - configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); - configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); - } else { - // evaluate the script - scriptEngine.eval(scriptBody); - } + // evaluate the script + scriptRunner.run(scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE)); // get configured processor from the script (if it exists) final Object obj = scriptEngine.get("processor"); @@ -579,32 +571,37 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor { invokeScriptedProcessorMethod("onStopped", context); scriptingComponentHelper.stop(); processor.set(null); - scriptEngine = null; + scriptRunner = null; } private void invokeScriptedProcessorMethod(String methodName, Object... params) { // Run the scripted processor's method here, if it exists - if (scriptEngine instanceof Invocable) { - final Invocable invocable = (Invocable) scriptEngine; - final Object obj = scriptEngine.get("processor"); - if (obj != null) { - - ComponentLog logger = getLogger(); - try { - invocable.invokeMethod(obj, methodName, params); - } catch (final NoSuchMethodException nsme) { - if (logger.isDebugEnabled()) { - logger.debug("Configured script Processor does not contain the method " + methodName); - } - } catch (final Exception e) { - // An error occurred during onScheduled, propagate it up - logger.error("Error while executing the scripted processor's method " + methodName, e); - if (e instanceof ProcessException) { - throw (ProcessException) e; + if (scriptRunner != null) { + final ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); + if (scriptEngine instanceof Invocable) { + final Invocable invocable = (Invocable) scriptEngine; + final Object obj = scriptEngine.get("processor"); + if (obj != null) { + + ComponentLog logger = getLogger(); + try { + invocable.invokeMethod(obj, methodName, params); + } catch (final NoSuchMethodException nsme) { + if (logger.isDebugEnabled()) { + logger.debug("Configured script Processor does not contain the method " + methodName); + } + } catch (final Exception e) { + // An error occurred during onScheduled, propagate it up + logger.error("Error while executing the scripted processor's method " + methodName, e); + if (e instanceof ProcessException) { + throw (ProcessException) e; + } + throw new ProcessException(e); } - throw new ProcessException(e); } } + } else { + throw new ProcessException("Error creating ScriptRunner"); } } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptRunner.java similarity index 61% rename from nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptEngineConfigurator.java rename to nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptRunner.java index 995c16e..dd2278a 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptEngineConfigurator.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptRunner.java @@ -17,26 +17,22 @@ package org.apache.nifi.processors.script; -import org.apache.nifi.logging.ComponentLog; - +import javax.script.Bindings; import javax.script.ScriptEngine; import javax.script.ScriptException; -import java.net.URL; -/** - * This interface describes callback methods used by the ExecuteScript/InvokeScript processors to perform - * engine-specific tasks at various points in the engine lifecycle. - */ -public interface ScriptEngineConfigurator { +public interface ScriptRunner { String getScriptEngineName(); - URL[] getModuleURLsForClasspath(String[] modulePaths, ComponentLog log); - - Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException; + ScriptEngine getScriptEngine(); - Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException; + /** + * Runs the script held by this ScriptRunner using the provided Bindings map (to include the session object for example) + * + * @param bindings The Bindings the underlying engine should use when running the script + * @throws ScriptException if an error occurs during execution of the script + */ + void run(Bindings bindings) throws ScriptException; - default void reset() { - } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedTransformRecord.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedTransformRecord.java index 46b5e04..efe54de 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedTransformRecord.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedTransformRecord.java @@ -172,10 +172,6 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search } scriptingComponentHelper.setupVariables(context); - - // Create a script engine for each possible task - final int maxTasks = context.getMaxConcurrentTasks(); - scriptingComponentHelper.setup(maxTasks, getLogger()); scriptToRun = scriptingComponentHelper.getScriptBody(); if (scriptToRun == null && scriptingComponentHelper.getScriptPath() != null) { @@ -183,6 +179,11 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search scriptToRun = IOUtils.toString(scriptStream, Charset.defaultCharset()); } } + + // Create a script runner for each possible task + final int maxTasks = context.getMaxConcurrentTasks(); + scriptingComponentHelper.setupScriptRunners(maxTasks, scriptToRun, getLogger()); + // Always compile when first run compiledScriptRef.set(null); } @@ -195,8 +196,8 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search return; } - final ScriptEngine scriptEngine = scriptingComponentHelper.engineQ.poll(); - if (scriptEngine == null) { + final ScriptRunner scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll(); + if (scriptRunner == null) { // This shouldn't happen. But just in case. session.rollback(); return; @@ -205,6 +206,7 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search try { final ScriptEvaluator evaluator; try { + ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); evaluator = createEvaluator(scriptEngine, flowFile); } catch (final ScriptException se) { getLogger().error("Failed to initialize script engine", se); @@ -214,7 +216,7 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search transform(flowFile, evaluator, context, session); } finally { - scriptingComponentHelper.engineQ.offer(scriptEngine); + scriptingComponentHelper.scriptRunnerQ.offer(scriptRunner); } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java index aa36ab4..0c5816d 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java @@ -30,16 +30,6 @@ public abstract class AbstractScriptedRecordFactory<T> extends AbstractScriptedC protected final AtomicReference<T> recordFactory = new AtomicReference<>(); public void setup() { - // Create a single script engine, the Processor object is reused by each task - if (scriptEngine == null) { - scriptingComponentHelper.setup(1, getLogger()); - scriptEngine = scriptingComponentHelper.engineQ.poll(); - } - - if (scriptEngine == null) { - throw new ProcessException("No script engine available!"); - } - if (scriptNeedsReload.get() || recordFactory.get() == null) { if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) { if (!reloadScriptFile(scriptingComponentHelper.getScriptPath())) { diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java index 009d1d2..fa4a20c 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java @@ -25,13 +25,15 @@ import org.apache.nifi.components.RequiredPermission; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processors.script.ScriptEngineConfigurator; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import javax.script.Invocable; +import javax.script.ScriptContext; +import javax.script.ScriptEngine; import javax.script.ScriptException; import java.io.IOException; import java.io.InputStream; @@ -65,6 +67,8 @@ public class ScriptedReader extends AbstractScriptedRecordFactory<RecordReaderFa try { return recordFactory.get().createRecordReader(variables, in, inputLength, logger); } catch (UndeclaredThrowableException ute) { + scriptRunner = null; + scriptingComponentHelper.scriptRunnerQ.clear(); throw new IOException(ute.getCause()); } } @@ -84,23 +88,23 @@ public class ScriptedReader extends AbstractScriptedRecordFactory<RecordReaderFa final Collection<ValidationResult> results = new HashSet<>(); try { + // Create a single script engine, the Processor object is reused by each task + scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger()); + scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll(); + + if (scriptRunner == null) { + throw new ProcessException("No script runner available!"); + } // get the engine and ensure its invocable + ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); if (scriptEngine instanceof Invocable) { final Invocable invocable = (Invocable) scriptEngine; - // Find a custom configurator and invoke their eval() method - ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); - if (configurator != null) { - configurator.reset(); - configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); - configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); - } else { - // evaluate the script - scriptEngine.eval(scriptBody); - } + // evaluate the script + scriptRunner.run(scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE)); // get configured processor from the script (if it exists) - final Object obj = scriptEngine.get("reader"); + final Object obj = scriptRunner.getScriptEngine().get("reader"); if (obj != null) { final ComponentLog logger = getLogger(); diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java index 99763b0..83f26df 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java @@ -25,13 +25,15 @@ import org.apache.nifi.components.RequiredPermission; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processors.script.ScriptEngineConfigurator; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.RecordSchema; import javax.script.Invocable; +import javax.script.ScriptContext; +import javax.script.ScriptEngine; import javax.script.ScriptException; import java.io.IOException; import java.io.OutputStream; @@ -88,23 +90,26 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor final Collection<ValidationResult> results = new HashSet<>(); try { + // Create a single script engine, the Processor object is reused by each task + if (scriptRunner == null) { + scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger()); + scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll(); + } + + if (scriptRunner == null) { + throw new ProcessException("No script runner available!"); + } // get the engine and ensure its invocable + ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); if (scriptEngine instanceof Invocable) { final Invocable invocable = (Invocable) scriptEngine; - // Find a custom configurator and invoke their eval() method - ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); - if (configurator != null) { - configurator.reset(); - configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); - configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); - } else { - // evaluate the script - scriptEngine.eval(scriptBody); - } + // evaluate the script + scriptRunner.run(scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE)); + // get configured processor from the script (if it exists) - final Object obj = scriptEngine.get("writer"); + final Object obj = scriptRunner.getScriptEngine().get("writer"); if (obj != null) { final ComponentLog logger = getLogger(); diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/sink/script/ScriptedRecordSink.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/sink/script/ScriptedRecordSink.java index f1bbcde..652965f 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/sink/script/ScriptedRecordSink.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/sink/script/ScriptedRecordSink.java @@ -30,7 +30,6 @@ import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.script.ScriptEngineConfigurator; import org.apache.nifi.record.sink.RecordSinkService; import org.apache.nifi.script.AbstractScriptedControllerService; import org.apache.nifi.script.ScriptingComponentHelper; @@ -38,6 +37,8 @@ import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.RecordSet; import javax.script.Invocable; +import javax.script.ScriptContext; +import javax.script.ScriptEngine; import javax.script.ScriptException; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; @@ -104,16 +105,6 @@ public class ScriptedRecordSink extends AbstractScriptedControllerService implem } public void setup() { - // Create a single script engine, the Processor object is reused by each task - if (scriptEngine == null) { - scriptingComponentHelper.setup(1, getLogger()); - scriptEngine = scriptingComponentHelper.engineQ.poll(); - } - - if (scriptEngine == null) { - throw new ProcessException("No script engine available!"); - } - if (scriptNeedsReload.get() || recordSink.get() == null) { if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) { reloadScriptFile(scriptingComponentHelper.getScriptPath()); @@ -137,23 +128,26 @@ public class ScriptedRecordSink extends AbstractScriptedControllerService implem final Collection<ValidationResult> results = new HashSet<>(); try { + // Create a single script engine, the Processor object is reused by each task + if (scriptRunner == null) { + scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger()); + scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll(); + } + + if (scriptRunner == null) { + throw new ProcessException("No script runner available!"); + } // get the engine and ensure its invocable + ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); if (scriptEngine instanceof Invocable) { final Invocable invocable = (Invocable) scriptEngine; - // Find a custom configurator and invoke their eval() method - ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); - if (configurator != null) { - configurator.reset(); - configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); - configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); - } else { - // evaluate the script - scriptEngine.eval(scriptBody); - } + // evaluate the script + scriptRunner.run(scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE)); + // get configured processor from the script (if it exists) - final Object obj = scriptEngine.get("recordSink"); + final Object obj = scriptRunner.getScriptEngine().get("recordSink"); if (obj != null) { final ComponentLog logger = getLogger(); @@ -217,26 +211,31 @@ public class ScriptedRecordSink extends AbstractScriptedControllerService implem super.onEnabled(context); // Call an non-interface method onEnabled(context), to allow a scripted RecordSinkService the chance to set up as necessary - final Invocable invocable = (Invocable) scriptEngine; - if (configurationContext != null) { - try { - // Get the actual object from the script engine, versus the proxy stored in RecordSinkService. The object may have additional methods, - // where RecordSinkService is a proxied interface - final Object obj = scriptEngine.get("recordSink"); - if (obj != null) { - try { - invocable.invokeMethod(obj, "onEnabled", context); - } catch (final NoSuchMethodException nsme) { - if (getLogger().isDebugEnabled()) { - getLogger().debug("Configured script RecordSinkService does not contain an onEnabled() method."); + if (scriptRunner != null) { + final ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); + final Invocable invocable = (Invocable) scriptEngine; + if (configurationContext != null) { + try { + // Get the actual object from the script engine, versus the proxy stored in RecordSinkService. The object may have additional methods, + // where RecordSinkService is a proxied interface + final Object obj = scriptRunner.getScriptEngine().get("recordSink"); + if (obj != null) { + try { + invocable.invokeMethod(obj, "onEnabled", context); + } catch (final NoSuchMethodException nsme) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Configured script RecordSinkService does not contain an onEnabled() method."); + } } + } else { + throw new ScriptException("No RecordSinkService was defined by the script."); } - } else { - throw new ScriptException("No RecordSinkService was defined by the script."); + } catch (ScriptException se) { + throw new ProcessException("Error executing onEnabled(context) method: " + se.getMessage(), se); } - } catch (ScriptException se) { - throw new ProcessException("Error executing onEnabled(context) method: " + se.getMessage(), se); } + } else { + throw new ProcessException("Error creating ScriptRunner"); } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/reporting/script/ScriptedReportingTask.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/reporting/script/ScriptedReportingTask.java index e19022c..f6a7c33 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/reporting/script/ScriptedReportingTask.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/reporting/script/ScriptedReportingTask.java @@ -33,7 +33,7 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.metrics.jvm.JmxJvmMetrics; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.script.ScriptEngineConfigurator; +import org.apache.nifi.processors.script.ScriptRunner; import org.apache.nifi.reporting.AbstractReportingTask; import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.script.ScriptingComponentHelper; @@ -128,8 +128,7 @@ public class ScriptedReportingTask extends AbstractReportingTask { public void setup(final ConfigurationContext context) { scriptingComponentHelper.setupVariables(context); - // Create a script engine for each possible task - scriptingComponentHelper.setup(1, getLogger()); + // Create a script runner scriptToRun = scriptingComponentHelper.getScriptBody(); try { @@ -142,6 +141,7 @@ public class ScriptedReportingTask extends AbstractReportingTask { } catch (IOException ioe) { throw new ProcessException(ioe); } + scriptingComponentHelper.setupScriptRunners(1, scriptToRun, getLogger()); vmMetrics = JmxJvmMetrics.getInstance(); } @@ -153,15 +153,15 @@ public class ScriptedReportingTask extends AbstractReportingTask { scriptingComponentHelper.createResources(); } } - ScriptEngine scriptEngine = scriptingComponentHelper.engineQ.poll(); + ScriptRunner scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll(); ComponentLog log = getLogger(); - if (scriptEngine == null) { + if (scriptRunner == null) { // No engine available so nothing more to do here return; } try { - + ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); try { Bindings bindings = scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE); if (bindings == null) { @@ -180,38 +180,19 @@ public class ScriptedReportingTask extends AbstractReportingTask { } } } + scriptRunner.run(bindings); + scriptingComponentHelper.scriptRunnerQ.offer(scriptRunner); - scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE); - - // Execute any engine-specific configuration before the script is evaluated - ScriptEngineConfigurator configurator = - scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); - - // Evaluate the script with the configurator (if it exists) or the engine - if (configurator != null) { - configurator.init(scriptEngine, scriptToRun, scriptingComponentHelper.getModules()); - configurator.eval(scriptEngine, scriptToRun, scriptingComponentHelper.getModules()); - } else { - scriptEngine.eval(scriptToRun); - } } catch (ScriptException e) { - // Reset the configurator on error, this can indicate to the configurator to recompile the script on next init() - ScriptEngineConfigurator configurator = - scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); + // Create a new ScriptRunner to replace the one that caused an exception + scriptingComponentHelper.setupScriptRunners(1, scriptToRun, getLogger()); - // Evaluate the script with the configurator (if it exists) or the engine - if (configurator != null) { - configurator.reset(); - } throw new ProcessException(e); } } catch (final Throwable t) { // Mimic AbstractProcessor behavior here - getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t}); + getLogger().error("{} failed to process due to {}; rolling back session", this, t); throw t; - } finally { - scriptingComponentHelper.engineQ.offer(scriptEngine); } - } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/engine/script/ScriptedRulesEngine.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/engine/script/ScriptedRulesEngine.java index 7c46d19..7d09509 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/engine/script/ScriptedRulesEngine.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/engine/script/ScriptedRulesEngine.java @@ -27,13 +27,14 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processors.script.ScriptEngineConfigurator; import org.apache.nifi.rules.Action; import org.apache.nifi.rules.engine.RulesEngineService; import org.apache.nifi.script.AbstractScriptedControllerService; import org.apache.nifi.script.ScriptingComponentHelper; import javax.script.Invocable; +import javax.script.ScriptContext; +import javax.script.ScriptEngine; import javax.script.ScriptException; import java.util.Collection; import java.util.Collections; @@ -75,16 +76,6 @@ public class ScriptedRulesEngine extends AbstractScriptedControllerService imple } public void setup() { - // Create a single script engine, the component object is reused by each task - if (scriptEngine == null) { - scriptingComponentHelper.setup(1, getLogger()); - scriptEngine = scriptingComponentHelper.engineQ.poll(); - } - - if (scriptEngine == null) { - throw new ProcessException("No script engine available!"); - } - if (scriptNeedsReload.get() || rulesEngine.get() == null) { if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) { reloadScriptFile(scriptingComponentHelper.getScriptPath()); @@ -108,20 +99,23 @@ public class ScriptedRulesEngine extends AbstractScriptedControllerService imple final Collection<ValidationResult> results = new HashSet<>(); try { + // Create a single script engine, the Processor object is reused by each task + if (scriptRunner == null) { + scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger()); + scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll(); + } + + if (scriptRunner == null) { + throw new ProcessException("No script runner available!"); + } // get the engine and ensure its invocable + ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); if (scriptEngine instanceof Invocable) { final Invocable invocable = (Invocable) scriptEngine; - // Find a custom configurator and invoke their eval() method - ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); - if (configurator != null) { - configurator.reset(); - configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); - configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); - } else { - // evaluate the script - scriptEngine.eval(scriptBody); - } + // evaluate the script + scriptRunner.run(scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE)); + // get configured processor from the script (if it exists) final Object obj = scriptEngine.get("rulesEngine"); @@ -188,26 +182,31 @@ public class ScriptedRulesEngine extends AbstractScriptedControllerService imple super.onEnabled(context); // Call an non-interface method onEnabled(context), to allow a scripted RulesEngineService the chance to set up as necessary - final Invocable invocable = (Invocable) scriptEngine; - if (configurationContext != null) { - try { - // Get the actual object from the script engine, versus the proxy stored in RulesEngineService. The object may have additional methods, - // where RulesEngineService is a proxied interface - final Object obj = scriptEngine.get("rulesEngine"); - if (obj != null) { - try { - invocable.invokeMethod(obj, "onEnabled", context); - } catch (final NoSuchMethodException nsme) { - if (getLogger().isDebugEnabled()) { - getLogger().debug("Configured script RulesEngineService does not contain an onEnabled() method."); + if (scriptRunner != null) { + final ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); + final Invocable invocable = (Invocable) scriptEngine; + if (configurationContext != null) { + try { + // Get the actual object from the script engine, versus the proxy stored in RulesEngineService. The object may have additional methods, + // where RulesEngineService is a proxied interface + final Object obj = scriptEngine.get("rulesEngine"); + if (obj != null) { + try { + invocable.invokeMethod(obj, "onEnabled", context); + } catch (final NoSuchMethodException nsme) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Configured script RulesEngineService does not contain an onEnabled() method."); + } } + } else { + throw new ScriptException("No RulesEngineService was defined by the script."); } - } else { - throw new ScriptException("No RulesEngineService was defined by the script."); + } catch (ScriptException se) { + throw new ProcessException("Error executing onEnabled(context) method: " + se.getMessage(), se); } - } catch (ScriptException se) { - throw new ProcessException("Error executing onEnabled(context) method: " + se.getMessage(), se); } + } else { + throw new ProcessException("Error creating ScriptRunner"); } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/handlers/script/ScriptedActionHandler.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/handlers/script/ScriptedActionHandler.java index bca51a0..2b48bf6 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/handlers/script/ScriptedActionHandler.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/handlers/script/ScriptedActionHandler.java @@ -28,7 +28,6 @@ import org.apache.nifi.context.PropertyContext; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processors.script.ScriptEngineConfigurator; import org.apache.nifi.rules.Action; import org.apache.nifi.rules.ActionHandler; import org.apache.nifi.rules.PropertyContextActionHandler; @@ -36,6 +35,8 @@ import org.apache.nifi.script.AbstractScriptedControllerService; import org.apache.nifi.script.ScriptingComponentHelper; import javax.script.Invocable; +import javax.script.ScriptContext; +import javax.script.ScriptEngine; import javax.script.ScriptException; import java.util.Collection; import java.util.Collections; @@ -77,16 +78,6 @@ public class ScriptedActionHandler extends AbstractScriptedControllerService imp } public void setup() { - // Create a single script engine, the component object is reused by each task - if (scriptEngine == null) { - scriptingComponentHelper.setup(1, getLogger()); - scriptEngine = scriptingComponentHelper.engineQ.poll(); - } - - if (scriptEngine == null) { - throw new ProcessException("No script engine available!"); - } - if (scriptNeedsReload.get() || actionHandler.get() == null) { if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) { reloadScriptFile(scriptingComponentHelper.getScriptPath()); @@ -110,23 +101,26 @@ public class ScriptedActionHandler extends AbstractScriptedControllerService imp final Collection<ValidationResult> results = new HashSet<>(); try { + // Create a single script engine, the Processor object is reused by each task + if (scriptRunner == null) { + scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger()); + scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll(); + } + + if (scriptRunner == null) { + throw new ProcessException("No script runner available!"); + } // get the engine and ensure its invocable + ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); if (scriptEngine instanceof Invocable) { final Invocable invocable = (Invocable) scriptEngine; - // Find a custom configurator and invoke their eval() method - ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); - if (configurator != null) { - configurator.reset(); - configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); - configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); - } else { - // evaluate the script - scriptEngine.eval(scriptBody); - } + // evaluate the script + scriptRunner.run(scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE)); + // get configured processor from the script (if it exists) - final Object obj = scriptEngine.get("actionHandler"); + final Object obj = scriptRunner.getScriptEngine().get("actionHandler"); if (obj != null) { final ComponentLog logger = getLogger(); @@ -190,53 +184,63 @@ public class ScriptedActionHandler extends AbstractScriptedControllerService imp super.onEnabled(context); // Call an non-interface method onEnabled(context), to allow a scripted ActionHandler the chance to set up as necessary - final Invocable invocable = (Invocable) scriptEngine; - if (configurationContext != null) { - try { - // Get the actual object from the script engine, versus the proxy stored in ActionHandler. The object may have additional methods, - // where ActionHandler is a proxied interface - final Object obj = scriptEngine.get("actionHandler"); - if (obj != null) { - try { - invocable.invokeMethod(obj, "onEnabled", context); - } catch (final NoSuchMethodException nsme) { - if (getLogger().isDebugEnabled()) { - getLogger().debug("Configured script ActionHandler does not contain an onEnabled() method."); + if (scriptRunner != null) { + final ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); + final Invocable invocable = (Invocable) scriptEngine; + if (configurationContext != null) { + try { + // Get the actual object from the script engine, versus the proxy stored in ActionHandler. The object may have additional methods, + // where ActionHandler is a proxied interface + final Object obj = scriptRunner.getScriptEngine().get("actionHandler"); + if (obj != null) { + try { + invocable.invokeMethod(obj, "onEnabled", context); + } catch (final NoSuchMethodException nsme) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Configured script ActionHandler does not contain an onEnabled() method."); + } } + } else { + throw new ScriptException("No ActionHandler was defined by the script."); } - } else { - throw new ScriptException("No ActionHandler was defined by the script."); + } catch (ScriptException se) { + throw new ProcessException("Error executing onEnabled(context) method", se); } - } catch (ScriptException se) { - throw new ProcessException("Error executing onEnabled(context) method", se); } + } else { + throw new ProcessException("Error creating ScriptRunner"); } } public void execute(PropertyContext context, Action action, Map<String, Object> facts) { // Attempt to call a non-ActionHandler interface method (i.e. execute(context, action, facts) from PropertyContextActionHandler) - final Invocable invocable = (Invocable) scriptEngine; + if (scriptRunner != null) { + final ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); + final Invocable invocable = (Invocable) scriptEngine; - try { - // Get the actual object from the script engine, versus the proxy stored in ActionHandler. The object may have additional methods, - // where ActionHandler is a proxied interface - final Object obj = scriptEngine.get("actionHandler"); - if (obj != null) { - try { - invocable.invokeMethod(obj, "execute", context, action, facts); - } catch (final NoSuchMethodException nsme) { - if (getLogger().isDebugEnabled()) { - getLogger().debug("Configured script ActionHandler is not a PropertyContextActionHandler and has no execute(context, action, facts) method, falling back to" - + "execute(action, facts)."); + try { + // Get the actual object from the script engine, versus the proxy stored in ActionHandler. The object may have additional methods, + // where ActionHandler is a proxied interface + final Object obj = scriptRunner.getScriptEngine().get("actionHandler"); + if (obj != null) { + try { + invocable.invokeMethod(obj, "execute", context, action, facts); + } catch (final NoSuchMethodException nsme) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Configured script ActionHandler is not a PropertyContextActionHandler and has no execute(context, action, facts) method, falling back to" + + "execute(action, facts)."); + } + execute(action, facts); } - execute(action, facts); + } else { + throw new ScriptException("No ActionHandler was defined by the script."); } - } else { - throw new ScriptException("No ActionHandler was defined by the script."); + } catch (ScriptException se) { + throw new ProcessException("Error executing onEnabled(context) method: " + se.getMessage(), se); } - } catch (ScriptException se) { - throw new ProcessException("Error executing onEnabled(context) method: " + se.getMessage(), se); + } else { + throw new ProcessException("Error creating ScriptRunner"); } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java index 52a9e0c..35ea374 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java @@ -25,8 +25,8 @@ import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.script.ScriptRunner; -import javax.script.ScriptEngine; import java.io.FileInputStream; import java.nio.charset.Charset; import java.util.ArrayList; @@ -46,7 +46,7 @@ public abstract class AbstractScriptedControllerService extends AbstractControll protected final AtomicBoolean scriptNeedsReload = new AtomicBoolean(true); - protected volatile ScriptEngine scriptEngine = null; + protected volatile ScriptRunner scriptRunner = null; protected volatile ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper(); protected volatile ConfigurationContext configurationContext = null; @@ -65,8 +65,7 @@ public abstract class AbstractScriptedControllerService extends AbstractControll scriptingComponentHelper.createResources(); } } - List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>(); - supportedPropertyDescriptors.addAll(scriptingComponentHelper.getDescriptors()); + List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>(scriptingComponentHelper.getDescriptors()); return Collections.unmodifiableList(supportedPropertyDescriptors); } @@ -112,7 +111,7 @@ public abstract class AbstractScriptedControllerService extends AbstractControll scriptNeedsReload.set(true); // Need to reset scriptEngine if the value has changed if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor) || ScriptingComponentUtils.MODULES.equals(descriptor)) { - scriptEngine = null; + scriptRunner = null; } } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/ScriptRunnerFactory.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/ScriptRunnerFactory.java new file mode 100644 index 0000000..1b90851 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/ScriptRunnerFactory.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.script; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.script.ScriptRunner; +import org.apache.nifi.script.impl.ClojureScriptRunner; +import org.apache.nifi.script.impl.GenericScriptRunner; +import org.apache.nifi.script.impl.GroovyScriptRunner; +import org.apache.nifi.script.impl.JavascriptScriptRunner; +import org.apache.nifi.script.impl.JythonScriptRunner; + +import javax.script.ScriptEngine; +import javax.script.ScriptEngineFactory; +import javax.script.ScriptException; +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.LinkedList; +import java.util.List; + +public class ScriptRunnerFactory { + + private final static ScriptRunnerFactory INSTANCE = new ScriptRunnerFactory(); + + private ScriptRunnerFactory() { + + } + + public static ScriptRunnerFactory getInstance() { + return INSTANCE; + } + + public ScriptRunner createScriptRunner(ScriptEngineFactory scriptEngineFactory, String scriptToRun, String[] modulePaths) + throws ScriptException { + ScriptEngine scriptEngine = scriptEngineFactory.getScriptEngine(); + String scriptEngineName = scriptEngineFactory.getLanguageName(); + if ("Groovy".equals(scriptEngineName)) { + return new GroovyScriptRunner(scriptEngine, scriptToRun, null); + } + if ("python".equals(scriptEngineName)) { + return new JythonScriptRunner(scriptEngine, scriptToRun, modulePaths); + } + if ("Clojure".equals(scriptEngineName)) { + return new ClojureScriptRunner(scriptEngine, scriptToRun, null); + } + if ("ECMAScript".equals(scriptEngineName)) { + return new JavascriptScriptRunner(scriptEngine, scriptToRun, null); + } + return new GenericScriptRunner(scriptEngine, scriptToRun, null); + } + + /** + * Scans the given module paths for JARs. The path itself (whether a directory or file) will be added to the list + * of returned module URLs, and if a directory is specified, it is scanned for JAR files (files ending with .jar). + * Any JAR files found are added to the list of module URLs. This is a convenience method for adding directories + * full of JAR files to an ExecuteScript or InvokeScriptedProcessor instance, rather than having to enumerate each + * JAR's URL. + * + * @param modulePaths An array of module paths to scan/add + * @param log A logger for the calling component, to provide feedback for missing files, e.g. + * @return An array of URLs corresponding to all modules determined from the input set of module paths. + */ + public URL[] getModuleURLsForClasspath(String scriptEngineName, String[] modulePaths, ComponentLog log) { + + if (!"Clojure".equals(scriptEngineName) + && !"Groovy".equals(scriptEngineName) + && "ECMAScript".equals(scriptEngineName)) { + return new URL[0]; + } + + List<URL> additionalClasspath = new LinkedList<>(); + + if (modulePaths == null) { + return new URL[0]; + } + for (String modulePathString : modulePaths) { + File modulePath = new File(modulePathString); + + if (modulePath.exists()) { + // Add the URL of this path + try { + additionalClasspath.add(modulePath.toURI().toURL()); + } catch (MalformedURLException mue) { + log.warn("{} is not a valid file/folder, ignoring", new Object[]{modulePath.getAbsolutePath()}, mue); + } + + // If the path is a directory, we need to scan for JARs and add them to the classpath + if (!modulePath.isDirectory()) { + continue; + } + File[] jarFiles = modulePath.listFiles((dir, name) -> (name != null && name.endsWith(".jar"))); + + if (jarFiles == null) { + continue; + } + // Add each to the classpath + for (File jarFile : jarFiles) { + try { + additionalClasspath.add(jarFile.toURI().toURL()); + + } catch (MalformedURLException mue) { + log.warn("{} is not a valid file/folder, ignoring", new Object[]{modulePath.getAbsolutePath()}, mue); + } + } + } else { + log.warn("{} does not exist, ignoring", modulePath.getAbsolutePath()); + } + } + return additionalClasspath.toArray(new URL[0]); + } +} diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/ScriptingComponentHelper.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/ScriptingComponentHelper.java index 6467086..cf92340 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/ScriptingComponentHelper.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/ScriptingComponentHelper.java @@ -24,28 +24,27 @@ import org.apache.nifi.components.resource.ResourceReferences; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processors.script.ScriptEngineConfigurator; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.script.ScriptRunner; import org.apache.nifi.util.StringUtils; -import javax.script.ScriptEngine; +import javax.script.Invocable; import javax.script.ScriptEngineFactory; import javax.script.ScriptEngineManager; +import javax.script.ScriptException; import java.net.URL; import java.net.URLClassLoader; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.ServiceLoader; import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -56,8 +55,6 @@ public class ScriptingComponentHelper { public PropertyDescriptor SCRIPT_ENGINE; - // A map from engine name to a custom configurator for that engine - public final Map<String, ScriptEngineConfigurator> scriptEngineConfiguratorMap = new ConcurrentHashMap<>(); public final AtomicBoolean isInitialized = new AtomicBoolean(false); public Map<String, ScriptEngineFactory> scriptEngineFactoryMap; @@ -68,7 +65,7 @@ public class ScriptingComponentHelper { private List<AllowableValue> engineAllowableValues; private ResourceReferences modules; - public BlockingQueue<ScriptEngine> engineQ = null; + public BlockingQueue<ScriptRunner> scriptRunnerQ = null; public String getScriptEngineName() { return scriptEngineName; @@ -135,14 +132,18 @@ public class ScriptingComponentHelper { return results; } + public void createResources() { + createResources(true); + } + /** * This method creates all resources needed for the script processor to function, such as script engines, * script file reloader threads, etc. */ - public void createResources() { + public void createResources(final boolean requireInvocable) { descriptors = new ArrayList<>(); // The following is required for JRuby, should be transparent to everything else. - // Note this is not done in a ScriptEngineConfigurator, as it is too early in the lifecycle. The + // Note this is not done in a ScriptRunner, as it is too early in the lifecycle. The // setting must be there before the factories/engines are loaded. System.setProperty("org.jruby.embed.localvariable.behavior", "persistent"); @@ -153,12 +154,14 @@ public class ScriptingComponentHelper { scriptEngineFactoryMap = new HashMap<>(scriptEngineFactories.size()); List<AllowableValue> engineList = new LinkedList<>(); for (ScriptEngineFactory factory : scriptEngineFactories) { - engineList.add(new AllowableValue(factory.getLanguageName())); - scriptEngineFactoryMap.put(factory.getLanguageName(), factory); + if (!requireInvocable || factory.getScriptEngine() instanceof Invocable) { + engineList.add(new AllowableValue(factory.getLanguageName())); + scriptEngineFactoryMap.put(factory.getLanguageName(), factory); + } } // Sort the list by name so the list always looks the same. - Collections.sort(engineList, (o1, o2) -> { + engineList.sort((o1, o2) -> { if (o1 == null) { return o2 == null ? 0 : 1; } @@ -169,7 +172,7 @@ public class ScriptingComponentHelper { }); engineAllowableValues = engineList; - AllowableValue[] engines = engineList.toArray(new AllowableValue[engineList.size()]); + AllowableValue[] engines = engineList.toArray(new AllowableValue[0]); SCRIPT_ENGINE = new PropertyDescriptor.Builder() .name("Script Engine") @@ -200,55 +203,32 @@ public class ScriptingComponentHelper { return path != null && Files.isRegularFile(Paths.get(path)); } - /** - * Performs common setup operations when the processor is scheduled to run. This method assumes the member - * variables associated with properties have been filled. - * - * @param numberOfScriptEngines number of engines to setup - */ - public void setup(int numberOfScriptEngines, ComponentLog log) { - - if (scriptEngineConfiguratorMap.isEmpty()) { - ServiceLoader<ScriptEngineConfigurator> configuratorServiceLoader = - ServiceLoader.load(ScriptEngineConfigurator.class); - for (ScriptEngineConfigurator configurator : configuratorServiceLoader) { - scriptEngineConfiguratorMap.put(configurator.getScriptEngineName().toLowerCase(), configurator); - } - } - setupEngines(numberOfScriptEngines, log); + public void setupScriptRunners(final int numberOfScriptEngines, final String scriptToRun, final ComponentLog log) { + setupScriptRunners(true, numberOfScriptEngines, scriptToRun, log); } /** - * Configures the specified script engine. First, the engine is loaded and instantiated using the JSR-223 + * Configures the specified script engine(s) as a queue of ScriptRunners. First, the engine is loaded and instantiated using the JSR-223 * javax.script APIs. Then, if any script configurators have been defined for this engine, their init() method is * called, and the configurator is saved for future calls. * * @param numberOfScriptEngines number of engines to setup - * @see org.apache.nifi.processors.script.ScriptEngineConfigurator + * @see org.apache.nifi.processors.script.ScriptRunner */ - protected void setupEngines(int numberOfScriptEngines, ComponentLog log) { - engineQ = new LinkedBlockingQueue<>(numberOfScriptEngines); + public void setupScriptRunners(final boolean newQ, final int numberOfScriptEngines, final String scriptToRun, final ComponentLog log) { + if (newQ) { + scriptRunnerQ = new LinkedBlockingQueue<>(numberOfScriptEngines); + } ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader(); try { if (StringUtils.isBlank(scriptEngineName)) { throw new IllegalArgumentException("The script engine name cannot be null"); } - ScriptEngineConfigurator configurator = scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase()); - // Get a list of URLs from the configurator (if present), or just convert modules from Strings to URLs - URL[] additionalClasspathURLs = null; - if (configurator != null) { - final String[] locations = modules.asLocations().toArray(new String[0]); - additionalClasspathURLs = configurator.getModuleURLsForClasspath(locations, log); - } else { - if (modules != null) { - final List<URL> urls = modules.asURLs(); - if (!urls.isEmpty()) { - additionalClasspathURLs = urls.toArray(new URL[urls.size()]); - } - } - } + final String[] locations = modules.asLocations().toArray(new String[0]); + final URL[] additionalClasspathURLs = ScriptRunnerFactory.getInstance().getModuleURLsForClasspath(scriptEngineName, locations, log); + // Need the right classloader when the engine is created. This ensures the NAR's execution class loader // (plus the module path) becomes the parent for the script engine @@ -259,11 +239,17 @@ public class ScriptingComponentHelper { Thread.currentThread().setContextClassLoader(scriptEngineModuleClassLoader); } - for (int i = 0; i < numberOfScriptEngines; i++) { - ScriptEngine scriptEngine = createScriptEngine(); - if (!engineQ.offer(scriptEngine)) { - log.error("Error adding script engine {}", new Object[]{scriptEngine.getFactory().getEngineName()}); + try { + for (int i = 0; i < numberOfScriptEngines; i++) { + // + ScriptEngineFactory factory = scriptEngineFactoryMap.get(scriptEngineName); + ScriptRunner scriptRunner = ScriptRunnerFactory.getInstance().createScriptRunner(factory, scriptToRun, locations); + if (!scriptRunnerQ.offer(scriptRunner)) { + log.error("Error adding script engine {}", scriptRunner.getScriptEngineName()); + } } + } catch (ScriptException se) { + throw new ProcessException("Could not instantiate script engines", se); } } finally { // Restore original context class loader @@ -278,27 +264,9 @@ public class ScriptingComponentHelper { modules = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().asResources().flattenRecursively(); } - - /** - * Provides a ScriptEngine corresponding to the currently selected script engine name. - * ScriptEngineManager.getEngineByName() doesn't use find ScriptEngineFactory.getName(), which - * is what we used to populate the list. So just search the list of factories until a match is - * found, then create and return a script engine. - * - * @return a Script Engine corresponding to the currently specified name, or null if none is found. - */ - protected ScriptEngine createScriptEngine() { - // - ScriptEngineFactory factory = scriptEngineFactoryMap.get(scriptEngineName); - if (factory == null) { - return null; - } - return factory.getScriptEngine(); - } - public void stop() { - if (engineQ != null) { - engineQ.clear(); + if (scriptRunnerQ != null) { + scriptRunnerQ.clear(); } } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/AbstractModuleClassloaderConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/AbstractModuleClassloaderConfigurator.java deleted file mode 100644 index 2285b1f..0000000 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/AbstractModuleClassloaderConfigurator.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.script.impl; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processors.script.ScriptEngineConfigurator; - -import java.io.File; -import java.io.FilenameFilter; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.LinkedList; -import java.util.List; - -/** - * This base class provides a common implementation for the getModuleURLsForClasspath method of the - * ScriptEngineConfigurator interface - */ -public abstract class AbstractModuleClassloaderConfigurator implements ScriptEngineConfigurator { - - /** - * Scans the given module paths for JARs. The path itself (whether a directory or file) will be added to the list - * of returned module URLs, and if a directory is specified, it is scanned for JAR files (files ending with .jar). - * Any JAR files found are added to the list of module URLs. This is a convenience method for adding directories - * full of JAR files to an ExecuteScript or InvokeScriptedProcessor instance, rather than having to enumerate each - * JAR's URL. - * @param modulePaths An array of module paths to scan/add - * @param log A logger for the calling component, to provide feedback for missing files, e.g. - * @return An array of URLs corresponding to all modules determined from the input set of module paths. - */ - @Override - public URL[] getModuleURLsForClasspath(String[] modulePaths, ComponentLog log) { - List<URL> additionalClasspath = new LinkedList<>(); - if (modulePaths != null) { - for (String modulePathString : modulePaths) { - File modulePath = new File(modulePathString); - - if (modulePath.exists()) { - // Add the URL of this path - try { - additionalClasspath.add(modulePath.toURI().toURL()); - } catch (MalformedURLException mue) { - log.warn("{} is not a valid file/folder, ignoring", new Object[]{modulePath.getAbsolutePath()}, mue); - } - - // If the path is a directory, we need to scan for JARs and add them to the classpath - if (modulePath.isDirectory()) { - File[] jarFiles = modulePath.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return (name != null && name.endsWith(".jar")); - } - }); - - if (jarFiles != null) { - // Add each to the classpath - for (File jarFile : jarFiles) { - try { - additionalClasspath.add(jarFile.toURI().toURL()); - - } catch (MalformedURLException mue) { - log.warn("{} is not a valid file/folder, ignoring", new Object[]{modulePath.getAbsolutePath()}, mue); - } - } - } - } - } else { - log.warn("{} does not exist, ignoring", new Object[]{modulePath.getAbsolutePath()}); - } - } - } - return additionalClasspath.toArray(new URL[additionalClasspath.size()]); - } -} diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/BaseScriptRunner.java similarity index 57% copy from nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptEngineConfigurator.java copy to nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/BaseScriptRunner.java index f89d06f..d3ff616 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptEngineConfigurator.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/BaseScriptRunner.java @@ -16,27 +16,28 @@ */ package org.apache.nifi.script.impl; +import org.apache.nifi.processors.script.ScriptRunner; + import javax.script.ScriptEngine; -import javax.script.ScriptException; /** - * This class offers methods to perform Javascript-specific operations during the script engine lifecycle. + * This base class provides a common implementation for the member variables underlying the + * ScriptRunner interface */ -public class JavascriptScriptEngineConfigurator extends AbstractModuleClassloaderConfigurator { +public abstract class BaseScriptRunner implements ScriptRunner { - @Override - public String getScriptEngineName() { - return "ECMAScript"; - } + protected ScriptEngine scriptEngine; + protected String scriptBody; + protected String[] modulePaths; - @Override - public Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException { - // No initialization methods needed at present - return engine; + public BaseScriptRunner(final ScriptEngine engine, final String scriptBody, final String[] modulePaths) { + this.scriptEngine = engine; + this.scriptBody = scriptBody; + this.modulePaths = modulePaths; } @Override - public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException { - return engine.eval(scriptBody); + public ScriptEngine getScriptEngine() { + return scriptEngine; } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/ClojureScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/ClojureScriptRunner.java similarity index 77% rename from nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/ClojureScriptEngineConfigurator.java rename to nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/ClojureScriptRunner.java index 3754d4c..67eb59b 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/ClojureScriptEngineConfigurator.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/ClojureScriptRunner.java @@ -18,10 +18,12 @@ package org.apache.nifi.script.impl; import org.apache.nifi.processors.script.engine.ClojureScriptEngine; +import javax.script.Bindings; +import javax.script.ScriptContext; import javax.script.ScriptEngine; import javax.script.ScriptException; -public class ClojureScriptEngineConfigurator extends AbstractModuleClassloaderConfigurator { +public class ClojureScriptRunner extends BaseScriptRunner { private static final String PRELOADS = "(:import \n" @@ -37,37 +39,30 @@ public class ClojureScriptEngineConfigurator extends AbstractModuleClassloaderCo + "[org.apache.nifi.processor.exception FlowFileAccessException FlowFileHandlingException MissingFlowFileException ProcessException]\n" + "[org.apache.nifi.processor.io InputStreamCallback OutputStreamCallback StreamCallback]\n" + "[org.apache.nifi.processor.util FlowFileFilters StandardValidators]\n" - + "[org.apache.nifi.processors.script ExecuteScript InvokeScriptedProcessor ScriptEngineConfigurator]\n" + + "[org.apache.nifi.processors.script ExecuteScript InvokeScriptedProcessor ScriptRunner]\n" + "[org.apache.nifi.script ScriptingComponentHelper ScriptingComponentUtils]\n" + "[org.apache.nifi.logging ComponentLog]\n" + "[org.apache.nifi.lookup LookupService RecordLookupService StringLookupService LookupFailureException]\n" + "[org.apache.nifi.record.sink RecordSinkService]\n" + ")\n"; - - private ScriptEngine scriptEngine; + public ClojureScriptRunner(ScriptEngine engine, String scriptBody, String[] modulePaths) { + super(engine, scriptBody, modulePaths); + } @Override public String getScriptEngineName() { return "Clojure"; } - - @Override - public Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException { - scriptEngine = engine; - return scriptEngine; - } - @Override - public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException { - scriptEngine = engine; - StringBuilder sb = new StringBuilder("(ns "); - sb.append(((ClojureScriptEngine) scriptEngine).getNamespace()); - sb.append(" "); - sb.append(PRELOADS); - sb.append(")\n"); - sb.append(scriptBody); - return engine.eval(sb.toString()); + public void run(Bindings bindings) throws ScriptException { + String sb = "(ns " + ((ClojureScriptEngine) scriptEngine).getNamespace() + + " " + + PRELOADS + + ")\n" + + scriptBody; + scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE); + scriptEngine.eval(sb); } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GenericScriptRunner.java similarity index 63% copy from nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptEngineConfigurator.java copy to nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GenericScriptRunner.java index f89d06f..028c1af 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptEngineConfigurator.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GenericScriptRunner.java @@ -16,27 +16,29 @@ */ package org.apache.nifi.script.impl; +import javax.script.Bindings; import javax.script.ScriptEngine; import javax.script.ScriptException; /** - * This class offers methods to perform Javascript-specific operations during the script engine lifecycle. + * This class offers methods to perform operations during the script runner lifecycle. */ -public class JavascriptScriptEngineConfigurator extends AbstractModuleClassloaderConfigurator { +public class GenericScriptRunner extends BaseScriptRunner { - @Override - public String getScriptEngineName() { - return "ECMAScript"; + private String engineName = "Unknown"; + + public GenericScriptRunner(ScriptEngine engine, String scriptBody, String[] modulePaths) { + super(engine, scriptBody, modulePaths); + this.engineName = engine.getFactory().getEngineName(); } @Override - public Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException { - // No initialization methods needed at present - return engine; + public String getScriptEngineName() { + return engineName; } @Override - public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException { - return engine.eval(scriptBody); + public void run(Bindings bindings) throws ScriptException { + scriptEngine.eval(scriptBody); } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GroovyScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GroovyScriptRunner.java similarity index 71% rename from nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GroovyScriptEngineConfigurator.java rename to nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GroovyScriptRunner.java index b86d356..a99cbfa 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GroovyScriptEngineConfigurator.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GroovyScriptRunner.java @@ -16,10 +16,12 @@ */ package org.apache.nifi.script.impl; +import javax.script.Bindings; +import javax.script.ScriptContext; import javax.script.ScriptEngine; import javax.script.ScriptException; -public class GroovyScriptEngineConfigurator extends AbstractModuleClassloaderConfigurator { +public class GroovyScriptRunner extends BaseScriptRunner { private static final String PRELOADS = "import org.apache.nifi.components.*\n" @@ -34,26 +36,18 @@ public class GroovyScriptEngineConfigurator extends AbstractModuleClassloaderCon + "import org.apache.nifi.record.sink.*\n" + "import org.apache.nifi.lookup.*\n"; - private ScriptEngine scriptEngine; + public GroovyScriptRunner(ScriptEngine engine, String scriptBody, String[] modulePaths) { + super(engine, scriptBody, modulePaths); + } @Override public String getScriptEngineName() { return "Groovy"; } - - - @Override - public Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException { - // No need to compile the script here, Groovy does it under the hood and its CompiledScript object just - // calls engine.eval() the same as we do in the eval() method below - scriptEngine = engine; - return scriptEngine; - } - @Override - public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException { - scriptEngine = engine; - return engine.eval(PRELOADS + scriptBody); + public void run(Bindings bindings) throws ScriptException { + scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE); + scriptEngine.eval(PRELOADS + scriptBody); } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptRunner.java similarity index 69% rename from nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptEngineConfigurator.java rename to nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptRunner.java index f89d06f..39dc3b4 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptEngineConfigurator.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptRunner.java @@ -16,27 +16,26 @@ */ package org.apache.nifi.script.impl; +import javax.script.Bindings; import javax.script.ScriptEngine; import javax.script.ScriptException; /** - * This class offers methods to perform Javascript-specific operations during the script engine lifecycle. + * This class offers methods to perform Javascript-specific operations during the script runner lifecycle. */ -public class JavascriptScriptEngineConfigurator extends AbstractModuleClassloaderConfigurator { +public class JavascriptScriptRunner extends BaseScriptRunner { - @Override - public String getScriptEngineName() { - return "ECMAScript"; + public JavascriptScriptRunner(ScriptEngine engine, String scriptBody, String[] modulePaths) { + super(engine, scriptBody, modulePaths); } @Override - public Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException { - // No initialization methods needed at present - return engine; + public String getScriptEngineName() { + return "ECMAScript"; } @Override - public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException { - return engine.eval(scriptBody); + public void run(Bindings bindings) throws ScriptException { + scriptEngine.eval(scriptBody); } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JythonScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JythonScriptEngineConfigurator.java deleted file mode 100644 index 14e291f..0000000 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JythonScriptEngineConfigurator.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.script.impl; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processors.script.ScriptEngineConfigurator; -import org.python.core.PyString; - -import javax.script.Compilable; -import javax.script.CompiledScript; -import javax.script.ScriptEngine; -import javax.script.ScriptException; -import java.net.URL; -import java.util.Arrays; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - -/** - * A helper class to configure the Jython engine with any specific requirements - */ -public class JythonScriptEngineConfigurator implements ScriptEngineConfigurator { - - private final AtomicReference<CompiledScript> compiledScriptRef = new AtomicReference<>(); - - @Override - public String getScriptEngineName() { - return "python"; - } - - @Override - public URL[] getModuleURLsForClasspath(String[] modulePaths, ComponentLog log) { - // We don't need to add the module paths to the classpath, they will be added via sys.path.append - return new URL[0]; - } - - @Override - public Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException { - // Always compile when first run - if (engine != null && compiledScriptRef.get() == null) { - // Add prefix for import sys and all jython modules - String prefix = "import sys\n" - + Arrays.stream(modulePaths).map((modulePath) -> "sys.path.append(" + PyString.encode_UnicodeEscape(modulePath, true) + ")") - .collect(Collectors.joining("\n")); - final CompiledScript compiled = ((Compilable) engine).compile(prefix + scriptBody); - compiledScriptRef.set(compiled); - } - return compiledScriptRef.get(); - } - - @Override - public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException { - Object returnValue = null; - if (engine != null) { - final CompiledScript existing = compiledScriptRef.get(); - if (existing == null) { - throw new ScriptException("Jython script has not been compiled successfully, the component must be restarted."); - } - returnValue = compiledScriptRef.get().eval(); - } - return returnValue; - } - - @Override - public void reset() { - compiledScriptRef.set(null); - } -} diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JythonScriptRunner.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JythonScriptRunner.java new file mode 100644 index 0000000..da315b0 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JythonScriptRunner.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.script.impl; + +import org.python.core.PyString; + +import javax.script.Bindings; +import javax.script.Compilable; +import javax.script.CompiledScript; +import javax.script.ScriptEngine; +import javax.script.ScriptException; +import java.util.Arrays; +import java.util.stream.Collectors; + +/** + * A helper class to configure the Jython engine with any specific requirements + */ +public class JythonScriptRunner extends BaseScriptRunner { + + private final CompiledScript compiledScript; + + public JythonScriptRunner(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException { + super(engine, scriptBody, modulePaths); + // Add prefix for import sys and all jython modules + String prefix = "import sys\n" + + Arrays.stream(modulePaths).map((modulePath) -> "sys.path.append(" + PyString.encode_UnicodeEscape(modulePath, true) + ")") + .collect(Collectors.joining("\n")); + compiledScript = ((Compilable) engine).compile(prefix + scriptBody); + } + + @Override + public String getScriptEngineName() { + return "python"; + } + + @Override + public ScriptEngine getScriptEngine() { + return scriptEngine; + } + + @Override + public void run(Bindings bindings) throws ScriptException { + if (compiledScript == null) { + throw new ScriptException("Jython script has not been successfully compiled"); + } + compiledScript.eval(); + } +} diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.script.ScriptEngineConfigurator b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.script.ScriptEngineConfigurator index fa53e29..5ee7038 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.script.ScriptEngineConfigurator +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.script.ScriptEngineConfigurator @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.script.impl.ClojureScriptEngineConfigurator -org.apache.nifi.script.impl.JythonScriptEngineConfigurator -org.apache.nifi.script.impl.GroovyScriptEngineConfigurator -org.apache.nifi.script.impl.JavascriptScriptEngineConfigurator +org.apache.nifi.script.impl.ClojureScriptRunner +org.apache.nifi.script.impl.JythonScriptRunner +org.apache.nifi.script.impl.GroovyScriptRunner +org.apache.nifi.script.impl.JavascriptScriptRunner diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/reporting/script/ScriptedReportingTaskTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/reporting/script/ScriptedReportingTaskTest.groovy index 47fff2a..609fbb5 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/reporting/script/ScriptedReportingTaskTest.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/reporting/script/ScriptedReportingTaskTest.groovy @@ -96,11 +96,12 @@ class ScriptedReportingTaskTest { task.onTrigger context // This script should return a variable x with the number of events and a variable e with the first event - def se = task.scriptEngine + def sr = task.scriptRunner + def se = sr.scriptEngine assertEquals 3, se.x assertEquals '1234', se.e.componentId assertEquals 'xyz', se.e.attributes.abc - task.offerScriptEngine(se) + task.offerScriptRunner(sr) } private ProvenanceEventRecord createProvenanceEvent(final long id) { @@ -138,10 +139,11 @@ class ScriptedReportingTaskTest { task.setup configurationContext task.onTrigger context - def se = task.scriptEngine + def sr = task.scriptRunner + def se = sr.scriptEngine // This script should store a variable called x with a map of stats to values assertTrue se.x?.uptime >= 0 - task.offerScriptEngine(se) + task.offerScriptRunner(sr) } @@ -171,20 +173,21 @@ class ScriptedReportingTaskTest { task.setup configurationContext task.onTrigger context - def se = task.scriptEngine + def sr = task.scriptRunner + def se = sr.scriptEngine // This script should store a variable called x with a map of stats to values assertTrue se.x?.uptime >= 0 - task.offerScriptEngine(se) + task.offerScriptRunner(sr) } class MockScriptedReportingTask extends ScriptedReportingTask implements AccessibleScriptingComponentHelper { - def getScriptEngine() { - return scriptingComponentHelper.engineQ.poll() + def getScriptRunner() { + return scriptingComponentHelper.scriptRunnerQ.poll() } - def offerScriptEngine(engine) { - scriptingComponentHelper.engineQ.offer(engine) + def offerScriptRunner(runner) { + scriptingComponentHelper.scriptRunnerQ.offer(runner) } @Override diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy index b4908cd..03395a2 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy @@ -1,11 +1,3 @@ -import org.apache.nifi.components.PropertyDescriptor -import org.apache.nifi.components.ValidationResult -import org.apache.nifi.processor.ProcessContext -import org.apache.nifi.processor.ProcessSessionFactory -import org.apache.nifi.processor.ProcessorInitializationContext -import org.apache.nifi.processor.Relationship -import org.apache.nifi.processor.exception.ProcessException - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -67,7 +59,7 @@ class GroovyProcessor implements Processor { def session = sessionFactory.createSession() def flowFile = session.get(); if (flowFile == null) { - return; + return } flowFile = session.putAttribute(flowFile, 'from-content', setAttributeFromThisInOnScheduled) // transfer @@ -102,4 +94,4 @@ class GroovyProcessor implements Processor { } } -processor = new GroovyProcessor(); \ No newline at end of file +processor = new GroovyProcessor() \ No newline at end of file