Github user rickysaltzer commented on a diff in the pull request:
https://github.com/apache/nifi/pull/185#discussion_r50780136
--- Diff:
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
---
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
+import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
+
+import javax.script.Bindings;
+import javax.script.Compilable;
+import javax.script.CompiledScript;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineFactory;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+@Tags({"script", "execute", "groovy", "python", "jython", "jruby", "ruby",
"javascript", "js", "lua", "luaj", "scala"})
+@CapabilityDescription("Executes a script given the flow file and a
process session. The script is responsible for "
+ + "handling the incoming flow file (transfer to SUCCESS or remove,
e.g.) as well as any flow files created by "
+ + "the script. If the handling is incomplete or incorrect, the
session will be rolled back.")
+@DynamicProperty(
+ name = "A script engine property to update",
+ value = "The value to set it to",
+ supportsExpressionLanguage = true,
+ description = "Updates a script engine property specified by the
Dynamic Property's key with the value "
+ + "specified by the Dynamic Property's value")
+public class ExecuteScript extends AbstractProcessor {
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("FlowFiles that were successfully processed")
+ .build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles that were failed to process")
+ .build();
+
+ public static PropertyDescriptor SCRIPT_ENGINE;
+
+ public static final PropertyDescriptor SCRIPT_FILE = new
PropertyDescriptor.Builder()
+ .name("Script File")
+ .required(false)
+ .description("Path to script file to execute. Use either file
or body not both")
+ .addValidator(new StandardValidators.FileExistsValidator(true))
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor SCRIPT_BODY = new
PropertyDescriptor.Builder()
+ .name("Script Body")
+ .required(false)
+ .description("Body to script to execute. Use either file or
body not both")
+ .addValidator(Validator.VALID)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor SCRIPT_ARGS = new
PropertyDescriptor.Builder()
+ .name("Arguments")
+ .required(false)
+ .description("Arguments to pass to scripting engine")
+ .addValidator(Validator.VALID)
+ .expressionLanguageSupported(true)
+ .defaultValue("")
+ .build();
+
+ public static final PropertyDescriptor MODULES = new
PropertyDescriptor.Builder()
+ .name("Module Directory")
+ .description("Path to a directory which contains modules
required by the script script.")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(new
StandardValidators.DirectoryExistsValidator(true, false))
+ .build();
+
+ // A map from engine name to a custom configurator for that engine
+ private final Map<String, ScriptEngineConfigurator>
scriptEngineConfiguratorMap = new ConcurrentHashMap<>();
+
+ private final AtomicBoolean isInitialized = new AtomicBoolean(false);
+ private final Lock lock = new ReentrantLock();
+ private SynchronousFileWatcher scriptWatcher;
+
+ private Map<String, ScriptEngineFactory> scriptEngineFactoryMap;
+ private ScriptEngine scriptEngine;
+ private String scriptEngineName;
+ private String scriptPath;
+ private String scriptBody;
+ private String modulePath;
+ private CompiledScript compiledScript;
+ private final AtomicBoolean scriptNeedsReload = new
AtomicBoolean(true);
+ private ScheduledExecutorService reloadService;
+ private List<PropertyDescriptor> descriptors;
+
+
+ /**
+ * Initializes this processor. A reload service is defined and
scheduled, for the purpose of watching for
+ * script file changes, which indicates a reload is necessary
+ *
+ * @param context in which to perform initialization
+ */
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ }
+
+
+ protected void createResources() {
+
+ // Set up script file reloader service. This checks to see if the
script file has changed, and if so, marks
+ // the script file as needing a reload before evaluation
+ if (reloadService == null) {
+ reloadService = Executors.newScheduledThreadPool(1);
+
+ // monitor the script if configured for changes
+ reloadService.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final boolean hasLock = lock.tryLock();
+
+ // if a property is changing we don't need to
reload this iteration
+ if (hasLock) {
+ try {
+ if (scriptWatcher != null &&
scriptWatcher.checkAndReset()) {
+ if (isFile(scriptPath)) {
+ scriptNeedsReload.set(true);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ } catch (final Throwable t) {
+ final ProcessorLog logger = getLogger();
+ final String message = "Unable to reload
configured script Processor: " + t;
+
+ logger.error(message);
+ if (logger.isDebugEnabled()) {
+ logger.error(message, t);
+ }
+ }
+ }
+ }, 30, 10, TimeUnit.SECONDS);
+ }
+
+ descriptors = new ArrayList<>();
+
+ // The following is required for JRuby, should be transparent to
everything else.
+ // Note this is not done in a ScriptEngineConfigurator, as it is
too early in the lifecycle. The
+ // setting must be there before the factories/engines are loaded.
+ System.setProperty("org.jruby.embed.localvariable.behavior",
"persistent");
+
+ // Create list of available engines
+ ScriptEngineManager scriptEngineManager = new
ScriptEngineManager();
+ List<ScriptEngineFactory> scriptEngineFactories =
scriptEngineManager.getEngineFactories();
+ if (scriptEngineFactories != null) {
+ scriptEngineFactoryMap = new
HashMap<>(scriptEngineFactories.size());
+ List<AllowableValue> engineList = new LinkedList<>();
+ for (ScriptEngineFactory factory : scriptEngineFactories) {
+ engineList.add(new
AllowableValue(factory.getLanguageName()));
+ scriptEngineFactoryMap.put(factory.getLanguageName(),
factory);
+ }
+
+ // Sort the list by name so the list always looks the same.
+ Collections.sort(engineList, new Comparator<AllowableValue>() {
+ @Override
+ public int compare(AllowableValue o1, AllowableValue o2) {
+ if (o1 == null) {
+ return o2 == null ? 0 : 1;
+ }
+ if (o2 == null) {
+ return -1;
+ }
+ return o1.getValue().compareTo(o2.getValue());
+ }
+ });
+
+ AllowableValue[] engines = engineList.toArray(new
AllowableValue[engineList.size()]);
+
+ SCRIPT_ENGINE = new PropertyDescriptor.Builder()
+ .name("Script Engine")
+ .required(true)
+ .description("The engine to execute scripts")
+ .allowableValues(engines)
+ .defaultValue(engines[0].getValue())
+ .required(true)
+ .expressionLanguageSupported(false)
+ .build();
+ descriptors.add(SCRIPT_ENGINE);
+ }
+
+ descriptors.add(SCRIPT_FILE);
+ descriptors.add(SCRIPT_BODY);
+ descriptors.add(SCRIPT_ARGS);
+ descriptors.add(MODULES);
+
+ isInitialized.set(true);
+ }
+
+ /**
+ * Returns the valid relationships for this processor.
+ *
+ * @return a Set of Relationships supported by this processor
+ */
+ @Override
+ public Set<Relationship> getRelationships() {
--- End diff --
Similar to how some of the other processors work, the user might want the
option to route the incoming flowfile to a different destination. Could we have
a `REL_ORIGINAL` relationship so the user can decide where the original
flowfile is routed?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---