NIFI-3938: Added ScriptedLookupService, some refactor for reusable scripting classes
This closes #1828. Signed-off-by: Andy LoPresto <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9294a261 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9294a261 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9294a261 Branch: refs/heads/master Commit: 9294a2613913f28789a1989744c5c79108768eeb Parents: d4f0c1d Author: Matt Burgess <[email protected]> Authored: Fri May 19 11:01:43 2017 -0400 Committer: Andy LoPresto <[email protected]> Committed: Fri May 19 11:20:38 2017 -0700 ---------------------------------------------------------------------- .../nifi-scripting-processors/pom.xml | 4 + .../lookup/script/ScriptedLookupService.java | 375 +++++++++++++++++++ .../nifi/processors/script/ExecuteScript.java | 2 + .../script/InvokeScriptedProcessor.java | 2 + .../script/ScriptingComponentHelper.java | 318 ---------------- .../script/ScriptingComponentUtils.java | 67 ---- .../AbstractModuleClassloaderConfigurator.java | 88 ----- .../impl/ClojureScriptEngineConfigurator.java | 70 ---- .../impl/GroovyScriptEngineConfigurator.java | 55 --- .../JavascriptScriptEngineConfigurator.java | 42 --- .../impl/JythonScriptEngineConfigurator.java | 63 ---- .../script/AbstractScriptedRecordFactory.java | 182 +-------- .../reporting/script/ScriptedReportingTask.java | 15 +- .../AbstractScriptedControllerService.java | 195 ++++++++++ .../nifi/script/ScriptingComponentHelper.java | 332 ++++++++++++++++ .../nifi/script/ScriptingComponentUtils.java | 67 ++++ .../AbstractModuleClassloaderConfigurator.java | 88 +++++ .../impl/ClojureScriptEngineConfigurator.java | 72 ++++ .../impl/GroovyScriptEngineConfigurator.java | 56 +++ .../JavascriptScriptEngineConfigurator.java | 42 +++ .../impl/JythonScriptEngineConfigurator.java | 63 ++++ ...org.apache.nifi.controller.ControllerService | 3 +- ...i.processors.script.ScriptEngineConfigurator | 8 +- .../script/TestScriptedLookupService.groovy | 112 ++++++ .../script/ExecuteScriptGroovyTest.groovy | 1 + .../record/script/ScriptedReaderTest.groovy | 4 +- .../script/ScriptedRecordSetWriterTest.groovy | 4 +- .../ScriptedReportingTaskGroovyTest.groovy | 4 +- .../AccessibleScriptingComponentHelper.java | 2 + .../nifi/processors/script/BaseScriptTest.java | 1 + .../processors/script/TestExecuteClojure.java | 1 + .../processors/script/TestExecuteGroovy.java | 1 + .../processors/script/TestExecuteJRuby.java | 1 + .../script/TestExecuteJavascript.java | 1 + .../processors/script/TestExecuteJython.java | 1 + .../nifi/processors/script/TestExecuteLua.java | 1 + .../processors/script/TestInvokeGroovy.java | 1 + .../processors/script/TestInvokeJavascript.java | 1 + .../processors/script/TestInvokeJython.java | 1 + .../resources/groovy/test_lookup_inline.groovy | 71 ++++ 40 files changed, 1512 insertions(+), 905 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/9294a261/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml index 6721d8a..94daeb4 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml @@ -47,6 +47,10 @@ <artifactId>nifi-record</artifactId> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-lookup-service-api</artifactId> + </dependency> + <dependency> <groupId>org.codehaus.groovy</groupId> <artifactId>groovy-all</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/9294a261/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/ScriptedLookupService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/ScriptedLookupService.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/ScriptedLookupService.java new file mode 100644 index 0000000..da846ec --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/ScriptedLookupService.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.lookup.script; + +import org.apache.nifi.annotation.behavior.Restricted; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.lookup.LookupService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.script.ScriptEngineConfigurator; +import org.apache.nifi.script.ScriptingComponentHelper; +import org.apache.nifi.script.ScriptingComponentUtils; +import org.apache.nifi.script.AbstractScriptedControllerService; + +import javax.script.Invocable; +import javax.script.ScriptException; +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A Controller service that allows the user to script the lookup operation to be performed (by LookupRecord, e.g.) + */ +@Tags({"lookup", "record", "script", "invoke", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj", "restricted"}) +@CapabilityDescription("Allows the user to provide a scripted LookupService instance in order to enrich records from an incoming flow file.") +@Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.") +public class ScriptedLookupService extends AbstractScriptedControllerService implements LookupService<Object> { + + protected final AtomicReference<LookupService<Object>> lookupService = new AtomicReference<>(); + + private volatile String kerberosServicePrincipal = null; + private volatile File kerberosConfigFile = null; + private volatile File kerberosServiceKeytab = null; + + @Override + public Optional<Object> lookup(String key) throws LookupFailureException { + // Delegate the lookup() call to the scripted LookupService + return lookupService.get().lookup(key); + } + + @Override + public Class<?> getValueType() { + // Delegate the getValueType() call to the scripted LookupService + return lookupService.get().getValueType(); + } + + @Override + protected void init(final ControllerServiceInitializationContext context) { + kerberosServicePrincipal = context.getKerberosServicePrincipal(); + kerberosConfigFile = context.getKerberosConfigurationFile(); + kerberosServiceKeytab = context.getKerberosServiceKeytab(); + } + + /** + * Returns a list of property descriptors supported by this processor. The + * list always includes properties such as script engine name, script file + * name, script body name, script arguments, and an external module path. If + * the scripted processor also defines supported properties, those are added + * to the list as well. + * + * @return a List of PropertyDescriptor objects supported by this processor + */ + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + + synchronized (scriptingComponentHelper.isInitialized) { + if (!scriptingComponentHelper.isInitialized.get()) { + scriptingComponentHelper.createResources(); + } + } + List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>(); + supportedPropertyDescriptors.addAll(scriptingComponentHelper.getDescriptors()); + + final ConfigurableComponent instance = lookupService.get(); + if (instance != null) { + try { + final List<PropertyDescriptor> instanceDescriptors = instance.getPropertyDescriptors(); + if (instanceDescriptors != null) { + supportedPropertyDescriptors.addAll(instanceDescriptors); + } + } catch (final Throwable t) { + final ComponentLog logger = getLogger(); + final String message = "Unable to get property descriptors from Processor: " + t; + + logger.error(message); + if (logger.isDebugEnabled()) { + logger.error(message, t); + } + } + } + + return Collections.unmodifiableList(supportedPropertyDescriptors); + } + + /** + * Returns a PropertyDescriptor for the given name. This is for the user to + * be able to define their own properties which will be available as + * variables in the script + * + * @param propertyDescriptorName used to lookup if any property descriptors + * exist for that name + * @return a PropertyDescriptor object corresponding to the specified + * dynamic property name + */ + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + + /** + * Handles changes to this processor's properties. If changes are made to + * script- or engine-related properties, the script will be reloaded. + * + * @param descriptor of the modified property + * @param oldValue non-null property value (previous) + * @param newValue the new property value or if null indicates the property + */ + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + final ComponentLog logger = getLogger(); + final ConfigurableComponent instance = lookupService.get(); + + if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor) + || ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor) + || ScriptingComponentUtils.MODULES.equals(descriptor) + || scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) { + scriptNeedsReload.set(true); + // Need to reset scriptEngine if the value has changed + if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) { + scriptEngine = null; + } + } else if (instance != null) { + // If the script provides a ConfigurableComponent, call its onPropertyModified() method + try { + instance.onPropertyModified(descriptor, oldValue, newValue); + } catch (final Exception e) { + final String message = "Unable to invoke onPropertyModified from scripted LookupService: " + e; + logger.error(message, e); + } + } + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + synchronized (scriptingComponentHelper.isInitialized) { + if (!scriptingComponentHelper.isInitialized.get()) { + scriptingComponentHelper.createResources(); + } + } + super.onEnabled(context); + + // Call an non-interface method onEnabled(context), to allow a scripted LookupService the chance to set up as necessary + final Invocable invocable = (Invocable) scriptEngine; + if (configurationContext != null) { + try { + // Get the actual object from the script engine, versus the proxy stored in lookupService. The object may have additional methods, + // where lookupService is a proxied interface + final Object obj = scriptEngine.get("lookupService"); + if (obj != null) { + try { + invocable.invokeMethod(obj, "onEnabled", context); + } catch (final NoSuchMethodException nsme) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Configured script LookupService does not contain an onEnabled() method."); + } + } + } else { + throw new ScriptException("No LookupService was defined by the script."); + } + } catch (ScriptException se) { + throw new ProcessException("Error executing onEnabled(context) method", se); + } + } + } + + @OnDisabled + public void onDisabled(final ConfigurationContext context) { + // Call an non-interface method onDisabled(context), to allow a scripted LookupService the chance to shut down as necessary + final Invocable invocable = (Invocable) scriptEngine; + if (configurationContext != null) { + try { + // Get the actual object from the script engine, versus the proxy stored in lookupService. The object may have additional methods, + // where lookupService is a proxied interface + final Object obj = scriptEngine.get("lookupService"); + if (obj != null) { + try { + invocable.invokeMethod(obj, "onDisabled", context); + } catch (final NoSuchMethodException nsme) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Configured script LookupService does not contain an onDisabled() method."); + } + } + } else { + throw new ScriptException("No LookupService was defined by the script."); + } + } catch (ScriptException se) { + throw new ProcessException("Error executing onDisabled(context) method", se); + } + } + } + + public void setup() { + // Create a single script engine, the Processor object is reused by each task + if (scriptEngine == null) { + scriptingComponentHelper.setup(1, getLogger()); + scriptEngine = scriptingComponentHelper.engineQ.poll(); + } + + if (scriptEngine == null) { + throw new ProcessException("No script engine available!"); + } + + if (scriptNeedsReload.get() || lookupService.get() == null) { + if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) { + reloadScriptFile(scriptingComponentHelper.getScriptPath()); + } else { + reloadScriptBody(scriptingComponentHelper.getScriptBody()); + } + scriptNeedsReload.set(false); + } + } + + /** + * Reloads the script RecordReaderFactory. This must be called within the lock. + * + * @param scriptBody An input stream associated with the script content + * @return Whether the script was successfully reloaded + */ + protected boolean reloadScript(final String scriptBody) { + // note we are starting here with a fresh listing of validation + // results since we are (re)loading a new/updated script. any + // existing validation results are not relevant + final Collection<ValidationResult> results = new HashSet<>(); + + try { + // get the engine and ensure its invocable + if (scriptEngine instanceof Invocable) { + final Invocable invocable = (Invocable) scriptEngine; + + // Find a custom configurator and invoke their eval() method + ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); + if (configurator != null) { + configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); + } else { + // evaluate the script + scriptEngine.eval(scriptBody); + } + + // get configured LookupService from the script (if it exists) + final Object obj = scriptEngine.get("lookupService"); + if (obj != null) { + final ComponentLog logger = getLogger(); + + try { + // set the logger if the processor wants it + invocable.invokeMethod(obj, "setLogger", logger); + } catch (final NoSuchMethodException nsme) { + if (logger.isDebugEnabled()) { + logger.debug("Scripted LookupService does not contain a setLogger method."); + } + } + + // record the processor for use later + final LookupService<Object> scriptedLookupService = invocable.getInterface(obj, LookupService.class); + lookupService.set(scriptedLookupService); + + if (scriptedLookupService != null) { + try { + scriptedLookupService.initialize(new ControllerServiceInitializationContext() { + @Override + public String getIdentifier() { + return ScriptedLookupService.this.getIdentifier(); + } + + @Override + public ComponentLog getLogger() { + return logger; + } + + @Override + public StateManager getStateManager() { + return ScriptedLookupService.this.getStateManager(); + } + + @Override + public ControllerServiceLookup getControllerServiceLookup() { + return ScriptedLookupService.super.getControllerServiceLookup(); + } + + @Override + public String getKerberosServicePrincipal() { + return ScriptedLookupService.this.kerberosServicePrincipal; + } + + @Override + public File getKerberosServiceKeytab() { + return ScriptedLookupService.this.kerberosServiceKeytab; + } + + @Override + public File getKerberosConfigurationFile() { + return ScriptedLookupService.this.kerberosConfigFile; + } + }); + } catch (final Exception e) { + logger.error("Unable to initialize scripted LookupService: " + e.getLocalizedMessage(), e); + throw new ProcessException(e); + } + } + + } else { + throw new ScriptException("No LookupService was defined by the script."); + } + } else { + throw new ScriptException("Script engine is not Invocable, cannot be used for ScriptedLookupService"); + } + + } catch (final Exception ex) { + final ComponentLog logger = getLogger(); + final String message = "Unable to load script: " + ex.getLocalizedMessage(); + + logger.error(message, ex); + results.add(new ValidationResult.Builder() + .subject("ScriptedLookupServiceValidation") + .valid(false) + .explanation("Unable to load script due to " + ex.getLocalizedMessage()) + .input(scriptingComponentHelper.getScriptPath()) + .build()); + } + + // store the updated validation results + validationResults.set(results); + + // return whether there was any issues loading the configured script + return results.isEmpty(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/9294a261/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java index 999211e..108da98 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java @@ -38,6 +38,8 @@ import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.script.ScriptingComponentHelper; +import org.apache.nifi.script.ScriptingComponentUtils; import java.nio.charset.Charset; import javax.script.Bindings; http://git-wip-us.apache.org/repos/asf/nifi/blob/9294a261/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java index 165fc75..6abf93e 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java @@ -41,6 +41,8 @@ import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.script.ScriptingComponentHelper; +import org.apache.nifi.script.ScriptingComponentUtils; import javax.script.Invocable; import javax.script.ScriptEngine; http://git-wip-us.apache.org/repos/asf/nifi/blob/9294a261/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptingComponentHelper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptingComponentHelper.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptingComponentHelper.java deleted file mode 100644 index a89b7b8..0000000 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptingComponentHelper.java +++ /dev/null @@ -1,318 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.script; - -import org.apache.nifi.logging.ComponentLog; - -import java.io.File; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.ServiceLoader; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.script.ScriptEngine; -import javax.script.ScriptEngineFactory; -import javax.script.ScriptEngineManager; -import javax.script.ScriptException; - -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.util.StringUtils; - -/** - * This class contains variables and methods common to scripting processors, reporting tasks, etc. - */ -public class ScriptingComponentHelper { - - public PropertyDescriptor SCRIPT_ENGINE; - - // A map from engine name to a custom configurator for that engine - public final Map<String, ScriptEngineConfigurator> scriptEngineConfiguratorMap = new ConcurrentHashMap<>(); - public final AtomicBoolean isInitialized = new AtomicBoolean(false); - - public Map<String, ScriptEngineFactory> scriptEngineFactoryMap; - private String scriptEngineName; - private String scriptPath; - private String scriptBody; - private String[] modules; - private List<PropertyDescriptor> descriptors; - - public BlockingQueue<ScriptEngine> engineQ = null; - - public String getScriptEngineName() { - return scriptEngineName; - } - - public void setScriptEngineName(String scriptEngineName) { - this.scriptEngineName = scriptEngineName; - } - - public String getScriptPath() { - return scriptPath; - } - - public void setScriptPath(String scriptPath) { - this.scriptPath = scriptPath; - } - - public String getScriptBody() { - return scriptBody; - } - - public void setScriptBody(String scriptBody) { - this.scriptBody = scriptBody; - } - - public String[] getModules() { - return modules; - } - - public void setModules(String[] modules) { - this.modules = modules; - } - - public List<PropertyDescriptor> getDescriptors() { - return descriptors; - } - - public void setDescriptors(List<PropertyDescriptor> descriptors) { - this.descriptors = descriptors; - } - - /** - * Custom validation for ensuring exactly one of Script File or Script Body is populated - * - * @param validationContext provides a mechanism for obtaining externally - * managed values, such as property values and supplies convenience methods - * for operating on those values - * @return A collection of validation results - */ - public Collection<ValidationResult> customValidate(ValidationContext validationContext) { - Set<ValidationResult> results = new HashSet<>(); - - // Verify that exactly one of "script file" or "script body" is set - Map<PropertyDescriptor, String> propertyMap = validationContext.getProperties(); - if (StringUtils.isEmpty(propertyMap.get(ScriptingComponentUtils.SCRIPT_FILE)) == StringUtils.isEmpty(propertyMap.get(ScriptingComponentUtils.SCRIPT_BODY))) { - results.add(new ValidationResult.Builder().valid(false).explanation( - "Exactly one of Script File or Script Body must be set").build()); - } - - return results; - } - - /** - * This method creates all resources needed for the script processor to function, such as script engines, - * script file reloader threads, etc. - */ - public void createResources() { - descriptors = new ArrayList<>(); - // The following is required for JRuby, should be transparent to everything else. - // Note this is not done in a ScriptEngineConfigurator, as it is too early in the lifecycle. The - // setting must be there before the factories/engines are loaded. - System.setProperty("org.jruby.embed.localvariable.behavior", "persistent"); - - // Create list of available engines - ScriptEngineManager scriptEngineManager = new ScriptEngineManager(); - List<ScriptEngineFactory> scriptEngineFactories = scriptEngineManager.getEngineFactories(); - if (scriptEngineFactories != null) { - scriptEngineFactoryMap = new HashMap<>(scriptEngineFactories.size()); - List<AllowableValue> engineList = new LinkedList<>(); - for (ScriptEngineFactory factory : scriptEngineFactories) { - engineList.add(new AllowableValue(factory.getLanguageName())); - scriptEngineFactoryMap.put(factory.getLanguageName(), factory); - } - - // Sort the list by name so the list always looks the same. - Collections.sort(engineList, (o1, o2) -> { - if (o1 == null) { - return o2 == null ? 0 : 1; - } - if (o2 == null) { - return -1; - } - return o1.getValue().compareTo(o2.getValue()); - }); - - AllowableValue[] engines = engineList.toArray(new AllowableValue[engineList.size()]); - - SCRIPT_ENGINE = new PropertyDescriptor.Builder() - .name("Script Engine") - .required(true) - .description("The engine to execute scripts") - .allowableValues(engines) - .defaultValue(engines[0].getValue()) - .required(true) - .expressionLanguageSupported(false) - .build(); - descriptors.add(SCRIPT_ENGINE); - } - - descriptors.add(ScriptingComponentUtils.SCRIPT_FILE); - descriptors.add(ScriptingComponentUtils.SCRIPT_BODY); - descriptors.add(ScriptingComponentUtils.MODULES); - - isInitialized.set(true); - } - - /** - * Determines whether the given path refers to a valid file - * - * @param path a path to a file - * @return true if the path refers to a valid file, false otherwise - */ - public static boolean isFile(final String path) { - return path != null && Files.isRegularFile(Paths.get(path)); - } - - /** - * Performs common setup operations when the processor is scheduled to run. This method assumes the member - * variables associated with properties have been filled. - * - * @param numberOfScriptEngines number of engines to setup - */ - public void setup(int numberOfScriptEngines, ComponentLog log) { - - if (scriptEngineConfiguratorMap.isEmpty()) { - ServiceLoader<ScriptEngineConfigurator> configuratorServiceLoader = - ServiceLoader.load(ScriptEngineConfigurator.class); - for (ScriptEngineConfigurator configurator : configuratorServiceLoader) { - scriptEngineConfiguratorMap.put(configurator.getScriptEngineName().toLowerCase(), configurator); - } - } - setupEngines(numberOfScriptEngines, log); - } - - /** - * Configures the specified script engine. First, the engine is loaded and instantiated using the JSR-223 - * javax.script APIs. Then, if any script configurators have been defined for this engine, their init() method is - * called, and the configurator is saved for future calls. - * - * @param numberOfScriptEngines number of engines to setup - * @see org.apache.nifi.processors.script.ScriptEngineConfigurator - */ - protected void setupEngines(int numberOfScriptEngines, ComponentLog log) { - engineQ = new LinkedBlockingQueue<>(numberOfScriptEngines); - ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader(); - try { - if (StringUtils.isBlank(scriptEngineName)) { - throw new IllegalArgumentException("The script engine name cannot be null"); - } - - ScriptEngineConfigurator configurator = scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase()); - - // Get a list of URLs from the configurator (if present), or just convert modules from Strings to URLs - URL[] additionalClasspathURLs = null; - if (configurator != null) { - additionalClasspathURLs = configurator.getModuleURLsForClasspath(modules, log); - } else { - if (modules != null) { - List<URL> urls = new LinkedList<>(); - for (String modulePathString : modules) { - try { - urls.add(new File(modulePathString).toURI().toURL()); - } catch (MalformedURLException mue) { - log.error("{} is not a valid file, ignoring", new Object[]{modulePathString}, mue); - } - } - additionalClasspathURLs = urls.toArray(new URL[urls.size()]); - } - } - - // Need the right classloader when the engine is created. This ensures the NAR's execution class loader - // (plus the module path) becomes the parent for the script engine - ClassLoader scriptEngineModuleClassLoader = additionalClasspathURLs != null - ? new URLClassLoader(additionalClasspathURLs, originalContextClassLoader) - : originalContextClassLoader; - if (scriptEngineModuleClassLoader != null) { - Thread.currentThread().setContextClassLoader(scriptEngineModuleClassLoader); - } - - for (int i = 0; i < numberOfScriptEngines; i++) { - ScriptEngine scriptEngine = createScriptEngine(); - try { - if (configurator != null) { - configurator.init(scriptEngine, modules); - } - if (!engineQ.offer(scriptEngine)) { - log.error("Error adding script engine {}", new Object[]{scriptEngine.getFactory().getEngineName()}); - } - - } catch (ScriptException se) { - log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName}); - if (log.isDebugEnabled()) { - log.error("Error initializing script engine configurator", se); - } - } - } - } finally { - // Restore original context class loader - Thread.currentThread().setContextClassLoader(originalContextClassLoader); - } - } - - void setupVariables(ProcessContext context) { - scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue(); - scriptPath = context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue(); - scriptBody = context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue(); - String modulePath = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().getValue(); - if (!StringUtils.isEmpty(modulePath)) { - modules = modulePath.split(","); - } else { - modules = new String[0]; - } - } - - /** - * Provides a ScriptEngine corresponding to the currently selected script engine name. - * ScriptEngineManager.getEngineByName() doesn't use find ScriptEngineFactory.getName(), which - * is what we used to populate the list. So just search the list of factories until a match is - * found, then create and return a script engine. - * - * @return a Script Engine corresponding to the currently specified name, or null if none is found. - */ - protected ScriptEngine createScriptEngine() { - // - ScriptEngineFactory factory = scriptEngineFactoryMap.get(scriptEngineName); - if (factory == null) { - return null; - } - return factory.getScriptEngine(); - } - - void stop() { - if (engineQ != null) { - engineQ.clear(); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/9294a261/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptingComponentUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptingComponentUtils.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptingComponentUtils.java deleted file mode 100644 index 43da7aa..0000000 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptingComponentUtils.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.script; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.Validator; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.util.StandardValidators; - -/** - * Utility methods and constants used by the scripting components. - */ -public class ScriptingComponentUtils { - /** A relationship indicating flow files were processed successfully */ - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("FlowFiles that were successfully processed") - .build(); - - /** A relationship indicating an error while processing flow files */ - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("FlowFiles that failed to be processed") - .build(); - - /** A property descriptor for specifying the location of a script file */ - public static final PropertyDescriptor SCRIPT_FILE = new PropertyDescriptor.Builder() - .name("Script File") - .required(false) - .description("Path to script file to execute. Only one of Script File or Script Body may be used") - .addValidator(new StandardValidators.FileExistsValidator(true)) - .expressionLanguageSupported(true) - .build(); - - /** A property descriptor for specifying the body of a script */ - public static final PropertyDescriptor SCRIPT_BODY = new PropertyDescriptor.Builder() - .name("Script Body") - .required(false) - .description("Body of script to execute. Only one of Script File or Script Body may be used") - .addValidator(Validator.VALID) - .expressionLanguageSupported(false) - .build(); - - /** A property descriptor for specifying the location of additional modules to be used by the script */ - public static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder() - .name("Module Directory") - .description("Comma-separated list of paths to files and/or directories which contain modules required by the script.") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); -} - http://git-wip-us.apache.org/repos/asf/nifi/blob/9294a261/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/AbstractModuleClassloaderConfigurator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/AbstractModuleClassloaderConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/AbstractModuleClassloaderConfigurator.java deleted file mode 100644 index 478a773..0000000 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/AbstractModuleClassloaderConfigurator.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.script.impl; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processors.script.ScriptEngineConfigurator; - -import java.io.File; -import java.io.FilenameFilter; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.LinkedList; -import java.util.List; - -/** - * This base class provides a common implementation for the getModuleURLsForClasspath method of the - * ScriptEngineConfigurator interface - */ -public abstract class AbstractModuleClassloaderConfigurator implements ScriptEngineConfigurator { - - /** - * Scans the given module paths for JARs. The path itself (whether a directory or file) will be added to the list - * of returned module URLs, and if a directory is specified, it is scanned for JAR files (files ending with .jar). - * Any JAR files found are added to the list of module URLs. This is a convenience method for adding directories - * full of JAR files to an ExecuteScript or InvokeScriptedProcessor instance, rather than having to enumerate each - * JAR's URL. - * @param modulePaths An array of module paths to scan/add - * @param log A logger for the calling component, to provide feedback for missing files, e.g. - * @return An array of URLs corresponding to all modules determined from the input set of module paths. - */ - @Override - public URL[] getModuleURLsForClasspath(String[] modulePaths, ComponentLog log) { - List<URL> additionalClasspath = new LinkedList<>(); - if (modulePaths != null) { - for (String modulePathString : modulePaths) { - File modulePath = new File(modulePathString); - - if (modulePath.exists()) { - // Add the URL of this path - try { - additionalClasspath.add(modulePath.toURI().toURL()); - } catch (MalformedURLException mue) { - log.warn("{} is not a valid file/folder, ignoring", new Object[]{modulePath.getAbsolutePath()}, mue); - } - - // If the path is a directory, we need to scan for JARs and add them to the classpath - if (modulePath.isDirectory()) { - File[] jarFiles = modulePath.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return (name != null && name.endsWith(".jar")); - } - }); - - if (jarFiles != null) { - // Add each to the classpath - for (File jarFile : jarFiles) { - try { - additionalClasspath.add(jarFile.toURI().toURL()); - - } catch (MalformedURLException mue) { - log.warn("{} is not a valid file/folder, ignoring", new Object[]{modulePath.getAbsolutePath()}, mue); - } - } - } - } - } else { - log.warn("{} does not exist, ignoring", new Object[]{modulePath.getAbsolutePath()}); - } - } - } - return additionalClasspath.toArray(new URL[additionalClasspath.size()]); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/9294a261/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/ClojureScriptEngineConfigurator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/ClojureScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/ClojureScriptEngineConfigurator.java deleted file mode 100644 index 7501382..0000000 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/ClojureScriptEngineConfigurator.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.script.impl; - -import org.apache.nifi.processors.script.engine.ClojureScriptEngine; - -import javax.script.ScriptEngine; -import javax.script.ScriptException; - -public class ClojureScriptEngineConfigurator extends AbstractModuleClassloaderConfigurator { - - private static final String PRELOADS = - "(:import \n" - + "[org.apache.nifi.components " - + "AbstractConfigurableComponent AllowableValue ConfigurableComponent PropertyDescriptor PropertyValue ValidationContext ValidationResult Validator" - + "]\n" - + "[org.apache.nifi.components.state Scope StateManager StateMap]\n" - + "[org.apache.nifi.flowfile FlowFile]\n" - + "[org.apache.nifi.processor " - + "AbstractProcessor AbstractSessionFactoryProcessor DataUnit FlowFileFilter ProcessContext Processor " - + "ProcessorInitializationContext ProcessSession ProcessSessionFactory Relationship SchedulingContext" - + "]\n" - + "[org.apache.nifi.processor.exception FlowFileAccessException FlowFileHandlingException MissingFlowFileException ProcessException]\n" - + "[org.apache.nifi.processor.io InputStreamCallback OutputStreamCallback StreamCallback]\n" - + "[org.apache.nifi.processor.util FlowFileFilters StandardValidators]\n" - + "[org.apache.nifi.processors.script ScriptingComponentHelper ScriptingComponentUtils ExecuteScript InvokeScriptedProcessor ScriptEngineConfigurator]\n" - + "[org.apache.nifi.logging ComponentLog]\n" - + ")\n"; - - - private ScriptEngine scriptEngine; - - @Override - public String getScriptEngineName() { - return "Clojure"; - } - - - @Override - public Object init(ScriptEngine engine, String[] modulePaths) throws ScriptException { - scriptEngine = engine; - return scriptEngine; - } - - @Override - public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException { - scriptEngine = engine; - StringBuilder sb = new StringBuilder("(ns "); - sb.append(((ClojureScriptEngine) scriptEngine).getNamespace()); - sb.append(" "); - sb.append(PRELOADS); - sb.append(")\n"); - sb.append(scriptBody); - return engine.eval(sb.toString()); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/9294a261/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/GroovyScriptEngineConfigurator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/GroovyScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/GroovyScriptEngineConfigurator.java deleted file mode 100644 index b4c4cd3..0000000 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/GroovyScriptEngineConfigurator.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.script.impl; - -import javax.script.ScriptEngine; -import javax.script.ScriptException; - -public class GroovyScriptEngineConfigurator extends AbstractModuleClassloaderConfigurator { - - private static final String PRELOADS = - "import org.apache.nifi.components.*\n" - + "import org.apache.nifi.flowfile.FlowFile\n" - + "import org.apache.nifi.processor.*\n" - + "import org.apache.nifi.processor.exception.*\n" - + "import org.apache.nifi.processor.io.*\n" - + "import org.apache.nifi.processor.util.*\n" - + "import org.apache.nifi.processors.script.*\n" - + "import org.apache.nifi.logging.ComponentLog\n"; - - - private ScriptEngine scriptEngine; - - @Override - public String getScriptEngineName() { - return "Groovy"; - } - - - - @Override - public Object init(ScriptEngine engine, String[] modulePaths) throws ScriptException { - scriptEngine = engine; - return scriptEngine; - } - - @Override - public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException { - scriptEngine = engine; - return engine.eval(PRELOADS + scriptBody); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/9294a261/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/JavascriptScriptEngineConfigurator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/JavascriptScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/JavascriptScriptEngineConfigurator.java deleted file mode 100644 index 9db099d..0000000 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/JavascriptScriptEngineConfigurator.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.script.impl; - -import javax.script.ScriptEngine; -import javax.script.ScriptException; - -/** - * This class offers methods to perform Javascript-specific operations during the script engine lifecycle. - */ -public class JavascriptScriptEngineConfigurator extends AbstractModuleClassloaderConfigurator { - - @Override - public String getScriptEngineName() { - return "ECMAScript"; - } - - @Override - public Object init(ScriptEngine engine, String[] modulePaths) throws ScriptException { - // No initialization methods needed at present - return engine; - } - - @Override - public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException { - return engine.eval(scriptBody); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/9294a261/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/JythonScriptEngineConfigurator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/JythonScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/JythonScriptEngineConfigurator.java deleted file mode 100644 index 3bff46b..0000000 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/impl/JythonScriptEngineConfigurator.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.script.impl; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processors.script.ScriptEngineConfigurator; - -import javax.script.ScriptEngine; -import javax.script.ScriptException; -import java.net.URL; - -/** - * A helper class to configure the Jython engine with any specific requirements - */ -public class JythonScriptEngineConfigurator implements ScriptEngineConfigurator { - - @Override - public String getScriptEngineName() { - return "python"; - } - - @Override - public URL[] getModuleURLsForClasspath(String[] modulePaths, ComponentLog log) { - // We don't need to add the module paths to the classpath, they will be added via sys.path.append - return new URL[0]; - } - - @Override - public Object init(ScriptEngine engine, String[] modulePaths) throws ScriptException { - return null; - } - - @Override - public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException { - Object returnValue = null; - if (engine != null) { - // Need to import the module path inside the engine, in order to pick up - // other Python/Jython modules - engine.eval("import sys"); - if (modulePaths != null) { - for (String modulePath : modulePaths) { - engine.eval("sys.path.append('" + modulePath + "')"); - } - } - returnValue = engine.eval(scriptBody); - } - return returnValue; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/9294a261/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java index 9b70fe7..3c694a7 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java @@ -16,130 +16,19 @@ */ package org.apache.nifi.record.script; -import org.apache.commons.io.IOUtils; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.script.ScriptingComponentHelper; -import org.apache.nifi.processors.script.ScriptingComponentUtils; -import org.apache.nifi.util.StringUtils; +import org.apache.nifi.script.ScriptingComponentHelper; +import org.apache.nifi.script.AbstractScriptedControllerService; -import javax.script.ScriptEngine; -import java.io.FileInputStream; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** * An abstract base class containing code common to the Scripted record reader/writer implementations */ -public abstract class AbstractScriptedRecordFactory<T> extends AbstractControllerService { +public abstract class AbstractScriptedRecordFactory<T> extends AbstractScriptedControllerService { protected final AtomicReference<T> recordFactory = new AtomicReference<>(); - protected final AtomicReference<Collection<ValidationResult>> validationResults = new AtomicReference<>(new ArrayList<>()); - - protected final AtomicBoolean scriptNeedsReload = new AtomicBoolean(true); - - protected volatile ScriptEngine scriptEngine = null; - protected volatile ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper(); - protected volatile ConfigurationContext configurationContext = null; - - /** - * Returns a list of property descriptors supported by this record reader. The - * list always includes properties such as script engine name, script file - * name, script body name, script arguments, and an external module path. - * - * @return a List of PropertyDescriptor objects supported by this processor - */ - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - - synchronized (scriptingComponentHelper.isInitialized) { - if (!scriptingComponentHelper.isInitialized.get()) { - scriptingComponentHelper.createResources(); - } - } - List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>(); - supportedPropertyDescriptors.addAll(scriptingComponentHelper.getDescriptors()); - - return Collections.unmodifiableList(supportedPropertyDescriptors); - } - - /** - * Returns a PropertyDescriptor for the given name. This is for the user to - * be able to define their own properties which will be available as - * variables in the script - * - * @param propertyDescriptorName used to lookup if any property descriptors - * exist for that name - * @return a PropertyDescriptor object corresponding to the specified - * dynamic property name - */ - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .dynamic(true) - .build(); - } - - /** - * Handles changes to this processor's properties. If changes are made to - * script- or engine-related properties, the script will be reloaded. - * - * @param descriptor of the modified property - * @param oldValue non-null property value (previous) - * @param newValue the new property value or if null indicates the property - */ - @Override - public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - - if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor) - || ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor) - || ScriptingComponentUtils.MODULES.equals(descriptor) - || scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) { - scriptNeedsReload.set(true); - // Need to reset scriptEngine if the value has changed - if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) { - scriptEngine = null; - } - } - } - - @Override - protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { - return scriptingComponentHelper.customValidate(validationContext); - } - - public void onEnabled(final ConfigurationContext context) { - this.configurationContext = context; - - scriptingComponentHelper.setScriptEngineName(context.getProperty(scriptingComponentHelper.SCRIPT_ENGINE).getValue()); - scriptingComponentHelper.setScriptPath(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue()); - scriptingComponentHelper.setScriptBody(context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue()); - String modulePath = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().getValue(); - if (!StringUtils.isEmpty(modulePath)) { - scriptingComponentHelper.setModules(modulePath.split(",")); - } else { - scriptingComponentHelper.setModules(new String[0]); - } - setup(); - } - public void setup() { // Create a single script engine, the Processor object is reused by each task if (scriptEngine == null) { @@ -160,69 +49,4 @@ public abstract class AbstractScriptedRecordFactory<T> extends AbstractControlle scriptNeedsReload.set(false); } } - - /** - * Reloads the script located at the given path - * - * @param scriptPath the path to the script file to be loaded - * @return true if the script was loaded successfully; false otherwise - */ - private boolean reloadScriptFile(final String scriptPath) { - final Collection<ValidationResult> results = new HashSet<>(); - - try (final FileInputStream scriptStream = new FileInputStream(scriptPath)) { - return reloadScript(IOUtils.toString(scriptStream, Charset.defaultCharset())); - - } catch (final Exception e) { - final ComponentLog logger = getLogger(); - final String message = "Unable to load script: " + e; - - logger.error(message, e); - results.add(new ValidationResult.Builder() - .subject("ScriptValidation") - .valid(false) - .explanation("Unable to load script due to " + e) - .input(scriptPath) - .build()); - } - - // store the updated validation results - validationResults.set(results); - - // return whether there was any issues loading the configured script - return results.isEmpty(); - } - - /** - * Reloads the script defined by the given string - * - * @param scriptBody the contents of the script to be loaded - * @return true if the script was loaded successfully; false otherwise - */ - private boolean reloadScriptBody(final String scriptBody) { - final Collection<ValidationResult> results = new HashSet<>(); - try { - return reloadScript(scriptBody); - - } catch (final Exception e) { - final ComponentLog logger = getLogger(); - final String message = "Unable to load script: " + e; - - logger.error(message, e); - results.add(new ValidationResult.Builder() - .subject("ScriptValidation") - .valid(false) - .explanation("Unable to load script due to " + e) - .input(scriptingComponentHelper.getScriptPath()) - .build()); - } - - // store the updated validation results - validationResults.set(results); - - // return whether there was any issues loading the configured script - return results.isEmpty(); - } - - protected abstract boolean reloadScript(final String scriptBody); } http://git-wip-us.apache.org/repos/asf/nifi/blob/9294a261/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/reporting/script/ScriptedReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/reporting/script/ScriptedReportingTask.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/reporting/script/ScriptedReportingTask.java index 9241d83..05454c3 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/reporting/script/ScriptedReportingTask.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/reporting/script/ScriptedReportingTask.java @@ -31,11 +31,9 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.script.ScriptEngineConfigurator; -import org.apache.nifi.processors.script.ScriptingComponentHelper; -import org.apache.nifi.processors.script.ScriptingComponentUtils; +import org.apache.nifi.script.ScriptingComponentHelper; import org.apache.nifi.reporting.AbstractReportingTask; import org.apache.nifi.reporting.ReportingContext; -import org.apache.nifi.util.StringUtils; import javax.script.Bindings; import javax.script.ScriptContext; @@ -119,15 +117,8 @@ public class ScriptedReportingTask extends AbstractReportingTask { */ @OnScheduled public void setup(final ConfigurationContext context) { - scriptingComponentHelper.setScriptEngineName(context.getProperty(scriptingComponentHelper.SCRIPT_ENGINE).getValue()); - scriptingComponentHelper.setScriptPath(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue()); - scriptingComponentHelper.setScriptBody(context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue()); - String modulePath = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().getValue(); - if (!StringUtils.isEmpty(modulePath)) { - scriptingComponentHelper.setModules(modulePath.split(",")); - } else { - scriptingComponentHelper.setModules(new String[0]); - } + scriptingComponentHelper.setupVariables(context); + // Create a script engine for each possible task scriptingComponentHelper.setup(1, getLogger()); scriptToRun = scriptingComponentHelper.getScriptBody(); http://git-wip-us.apache.org/repos/asf/nifi/blob/9294a261/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java new file mode 100644 index 0000000..126bba3 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.script; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; + +import javax.script.ScriptEngine; +import java.io.FileInputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * An abstract class with common methods and variables for reuse among Controller Services + */ +public abstract class AbstractScriptedControllerService extends AbstractControllerService { + + protected final AtomicReference<Collection<ValidationResult>> validationResults = new AtomicReference<>(new ArrayList<>()); + + protected final AtomicBoolean scriptNeedsReload = new AtomicBoolean(true); + + protected volatile ScriptEngine scriptEngine = null; + protected volatile ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper(); + protected volatile ConfigurationContext configurationContext = null; + + /** + * Returns a list of property descriptors supported by this record reader. The + * list always includes properties such as script engine name, script file + * name, script body name, script arguments, and an external module path. + * + * @return a List of PropertyDescriptor objects supported by this processor + */ + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + + synchronized (scriptingComponentHelper.isInitialized) { + if (!scriptingComponentHelper.isInitialized.get()) { + scriptingComponentHelper.createResources(); + } + } + List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>(); + supportedPropertyDescriptors.addAll(scriptingComponentHelper.getDescriptors()); + + return Collections.unmodifiableList(supportedPropertyDescriptors); + } + + /** + * Returns a PropertyDescriptor for the given name. This is for the user to + * be able to define their own properties which will be available as + * variables in the script + * + * @param propertyDescriptorName used to lookup if any property descriptors + * exist for that name + * @return a PropertyDescriptor object corresponding to the specified + * dynamic property name + */ + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + + /** + * Handles changes to this processor's properties. If changes are made to + * script- or engine-related properties, the script will be reloaded. + * + * @param descriptor of the modified property + * @param oldValue non-null property value (previous) + * @param newValue the new property value or if null indicates the property + */ + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + + if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor) + || ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor) + || ScriptingComponentUtils.MODULES.equals(descriptor) + || scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) { + scriptNeedsReload.set(true); + // Need to reset scriptEngine if the value has changed + if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) { + scriptEngine = null; + } + } + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + return scriptingComponentHelper.customValidate(validationContext); + } + + public void onEnabled(final ConfigurationContext context) { + this.configurationContext = context; + + scriptingComponentHelper.setupVariables(context); + setup(); + } + + abstract public void setup(); + + /** + * Reloads the script located at the given path + * + * @param scriptPath the path to the script file to be loaded + * @return true if the script was loaded successfully; false otherwise + */ + protected boolean reloadScriptFile(final String scriptPath) { + final Collection<ValidationResult> results = new HashSet<>(); + + try (final FileInputStream scriptStream = new FileInputStream(scriptPath)) { + return reloadScript(IOUtils.toString(scriptStream, Charset.defaultCharset())); + + } catch (final Exception e) { + final ComponentLog logger = getLogger(); + final String message = "Unable to load script: " + e; + + logger.error(message, e); + results.add(new ValidationResult.Builder() + .subject("ScriptValidation") + .valid(false) + .explanation("Unable to load script due to " + e) + .input(scriptPath) + .build()); + } + + // store the updated validation results + validationResults.set(results); + + // return whether there was any issues loading the configured script + return results.isEmpty(); + } + + /** + * Reloads the script defined by the given string + * + * @param scriptBody the contents of the script to be loaded + * @return true if the script was loaded successfully; false otherwise + */ + protected boolean reloadScriptBody(final String scriptBody) { + final Collection<ValidationResult> results = new HashSet<>(); + try { + return reloadScript(scriptBody); + + } catch (final Exception e) { + final ComponentLog logger = getLogger(); + final String message = "Unable to load script: " + e; + + logger.error(message, e); + results.add(new ValidationResult.Builder() + .subject("ScriptValidation") + .valid(false) + .explanation("Unable to load script due to " + e) + .input(scriptingComponentHelper.getScriptPath()) + .build()); + } + + // store the updated validation results + validationResults.set(results); + + // return whether there was any issues loading the configured script + return results.isEmpty(); + } + + protected abstract boolean reloadScript(final String scriptBody); +}
