This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 4b509aa NIFI-3221 This closes #3396. Add a new property for setting
the argument passing strategy, either the existing parameter, or by adding new
dynamic parameters, along with implementation and tests This allows for passing
arguments with quotes.
4b509aa is described below
commit 4b509aa5a5b3d98d0a23d1c8fce090c8bee1e6d8
Author: Otto Fowler <[email protected]>
AuthorDate: Wed Mar 27 16:46:24 2019 -0400
NIFI-3221 This closes #3396. Add a new property for setting the argument
passing strategy, either the existing parameter, or by adding new dynamic
parameters, along with implementation and tests
This allows for passing arguments with quotes.
Signed-off-by: Joe Witt <[email protected]>
---
.../processors/standard/ExecuteStreamCommand.java | 122 +++-
.../standard/TestExecuteStreamCommand.java | 680 +++++++++++++++++++++
2 files changed, 795 insertions(+), 7 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
index 94db1c0..4a68301 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
@@ -32,9 +32,12 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -46,7 +49,9 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@@ -83,6 +88,15 @@ import org.apache.nifi.stream.io.StreamUtils;
* <li>Supports expression language: true</li>
* </ul>
* </li>
+ * <li>Arguments Strategy
+ * <ul>
+ * <li>Selects the strategy to use for arguments to the executable</li>
+ * <ul>
+ * <li>Command Arguments Property: Use the delimited list of arguments from
the Command Arguments Property. Does not support quotations in parameters.</li>
+ * <li>Dynamic Property Arguments: Use Dynamic Properties, with each property
a separate argument. Does support quotes.</li>
+ * </ul>
+ * </ul>
+ * </li>
* <li>Command Arguments
* <ul>
* <li>The arguments to supply to the executable delimited by the ';'
character. Each argument may be an Expression Language statement.</li>
@@ -134,7 +148,13 @@ import org.apache.nifi.stream.io.StreamUtils;
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"command execution", "command", "stream", "execute"})
@CapabilityDescription("Executes an external command on the contents of a flow
file, and creates a new flow file with the results of the command.")
-@DynamicProperty(name = "An environment variable name", value = "An
environment variable value", description = "These environment variables are
passed to the process spawned by this Processor")
+@DynamicProperties({
+ @DynamicProperty(name = "An environment variable name", value = "An
environment variable value",
+ description = "These environment variables are passed to the process
spawned by this Processor"),
+ @DynamicProperty(name = "command.argument.<NUMBER>", value = "Argument to
be supplied to the command",
+ description = "These arguments are supplied to the process spawned by
this Processor when using the "
+ + "Command Arguments Strategy : Dynamic Property Arguments. The NUMBER
will determine the order.")
+})
@WritesAttributes({
@WritesAttribute(attribute = "execution.command", description = "The
name of the command executed"),
@WritesAttribute(attribute = "execution.command.args", description =
"The semi-colon delimited list of arguments"),
@@ -167,6 +187,17 @@ public class ExecuteStreamCommand extends
AbstractProcessor {
private final static Set<Relationship> OUTPUT_STREAM_RELATIONSHIP_SET;
private final static Set<Relationship> ATTRIBUTE_RELATIONSHIP_SET;
+ private static final Pattern DYNAMIC_PARAMETER_NAME =
Pattern.compile("command\\.argument\\.(?<commandIndex>[0-9]+)$");
+ public static final String executionArguments = "Command Arguments
Property";
+ public static final String dynamicArguements = "Dynamic Property
Arguments";
+
+ static final AllowableValue EXECUTION_ARGUMENTS_PROPERTY_STRATEGEY = new
AllowableValue(executionArguments, executionArguments,
+ "Arguments to be supplied to the executable are taken from the
Command Arguments property");
+
+ static final AllowableValue DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY = new
AllowableValue(dynamicArguements,dynamicArguements,
+ "Arguments to be supplied to the executable are taken from dynamic
properties");
+
+
private static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR =
StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING,
true);
static final PropertyDescriptor EXECUTION_COMMAND = new
PropertyDescriptor.Builder()
.name("Command Path")
@@ -176,6 +207,16 @@ public class ExecuteStreamCommand extends
AbstractProcessor {
.required(true)
.build();
+ static final PropertyDescriptor ARGUMENTS_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("argumentsStrategy")
+ .displayName("Command Arguments Strategy")
+ .description("Strategy for configuring arguments to be supplied to
the command.")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .required(false)
+
.allowableValues(EXECUTION_ARGUMENTS_PROPERTY_STRATEGEY.getValue(),DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue())
+ .defaultValue(EXECUTION_ARGUMENTS_PROPERTY_STRATEGEY.getValue())
+ .build();
+
static final PropertyDescriptor EXECUTION_ARGUMENTS = new
PropertyDescriptor.Builder()
.name("Command Arguments")
.description("The arguments to supply to the executable delimited
by the ';' character.")
@@ -245,6 +286,7 @@ public class ExecuteStreamCommand extends AbstractProcessor
{
static {
List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(ARGUMENTS_STRATEGY);
props.add(EXECUTION_ARGUMENTS);
props.add(EXECUTION_COMMAND);
props.add(IGNORE_STDIN);
@@ -298,12 +340,29 @@ public class ExecuteStreamCommand extends
AbstractProcessor {
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
- return new PropertyDescriptor.Builder()
+ if (!propertyDescriptorName.startsWith("command.argument.")) {
+ return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
- .description("Sets the environment variable '" +
propertyDescriptorName + "' for the process' environment")
+ .description(
+ "Sets the environment variable '" + propertyDescriptorName
+ "' for the process' environment")
.dynamic(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ }
+ // get the number part of the name
+ Matcher matcher =
DYNAMIC_PARAMETER_NAME.matcher(propertyDescriptorName);
+ if (matcher.matches()) {
+ final String commandIndex = matcher.group("commandIndex");
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .displayName(propertyDescriptorName)
+ .description("Argument passed to command")
+ .dynamic(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+ .build();
+ }
+ return null;
}
@Override
@@ -315,18 +374,67 @@ public class ExecuteStreamCommand extends
AbstractProcessor {
final ArrayList<String> args = new ArrayList<>();
final boolean putToAttribute =
context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).isSet();
+ final PropertyValue argumentsStrategyPropertyValue =
context.getProperty(ARGUMENTS_STRATEGY);
+ final boolean useDynamicPropertyArguments =
argumentsStrategyPropertyValue.isSet() &&
argumentsStrategyPropertyValue.getValue().equals(DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
final Integer attributeSize =
context.getProperty(PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
final String attributeName =
context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).getValue();
final String executeCommand =
context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(inputFlowFile).getValue();
args.add(executeCommand);
- final String commandArguments =
context.getProperty(EXECUTION_ARGUMENTS).evaluateAttributeExpressions(inputFlowFile).getValue();
final boolean ignoreStdin =
Boolean.parseBoolean(context.getProperty(IGNORE_STDIN).getValue());
- if (!StringUtils.isBlank(commandArguments)) {
- for (String arg : ArgumentUtils.splitArgs(commandArguments,
context.getProperty(ARG_DELIMITER).getValue().charAt(0))) {
- args.add(arg);
+ final String commandArguments;
+ if (!useDynamicPropertyArguments) {
+ commandArguments =
context.getProperty(EXECUTION_ARGUMENTS).evaluateAttributeExpressions(inputFlowFile).getValue();
+ if (!StringUtils.isBlank(commandArguments)) {
+ for (String arg : ArgumentUtils
+ .splitArgs(commandArguments,
context.getProperty(ARG_DELIMITER).getValue().charAt(0))) {
+ args.add(arg);
+ }
+ }
+ } else {
+
+ ArrayList<PropertyDescriptor> propertyDescriptors = new
ArrayList<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry :
context.getProperties().entrySet()) {
+ Matcher matcher =
DYNAMIC_PARAMETER_NAME.matcher(entry.getKey().getName());
+ if (matcher.matches()) {
+ propertyDescriptors.add(entry.getKey());
+ }
+ }
+ Collections.sort(propertyDescriptors,(p1,p2) -> {
+ Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(p1.getName());
+ String indexString1 = null;
+ while (matcher.find()) {
+ indexString1 = matcher.group("commandIndex");
+ }
+ matcher = DYNAMIC_PARAMETER_NAME.matcher(p2.getName());
+ String indexString2 = null;
+ while (matcher.find()) {
+ indexString2 = matcher.group("commandIndex");
+ }
+ final int index1 = Integer.parseInt(indexString1);
+ final int index2 = Integer.parseInt(indexString2);
+ if ( index1 > index2 ) {
+ return 1;
+ } else if (index1 < index2) {
+ return -1;
+ }
+ return 0;
+ });
+ for ( final PropertyDescriptor descriptor : propertyDescriptors) {
+
args.add(context.getProperty(descriptor.getName()).evaluateAttributeExpressions(inputFlowFile).getValue());
+ }
+ if (args.size() > 0) {
+ final StringBuilder builder = new StringBuilder();
+
+ for ( int i = 1; i < args.size(); i++) {
+ builder.append(args.get(i)).append("\t");
+ }
+ commandArguments = builder.toString().trim();
+ } else {
+ commandArguments = "";
}
}
+
final String workingDir =
context.getProperty(WORKING_DIR).evaluateAttributeExpressions(inputFlowFile).getValue();
final ProcessBuilder builder = new ProcessBuilder();
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
index 08282cd..c8b8763 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
@@ -23,13 +23,21 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.FileUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processors.standard.util.ArgumentUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -80,6 +88,51 @@ public class TestExecuteStreamCommand {
}
@Test
+ public void testExecuteJarDynamicPropArgs() throws Exception {
+ File exJar = new
File("src/test/resources/ExecuteCommand/TestSuccess.jar");
+ File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.enqueue(dummy.toPath());
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY,
ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1, "-jar");
+ PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.2")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp2, jarPath);
+ controller.run(1);
+
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP,
1);
+
+ List<MockFlowFile> flowFiles =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
+ MockFlowFile outputFlowFile = flowFiles.get(0);
+ byte[] byteArray = outputFlowFile.toByteArray();
+ String result = new String(byteArray);
+ assertTrue(Pattern.compile("Test was a
success\r?\n").matcher(result).find());
+ assertEquals("0", outputFlowFile.getAttribute("execution.status"));
+ assertEquals("java", outputFlowFile.getAttribute("execution.command"));
+ assertEquals("-jar",
outputFlowFile.getAttribute("execution.command.args").substring(0, 4).trim());
+ String attribute =
outputFlowFile.getAttribute("execution.command.args");
+ String expected = "src" + File.separator + "test" + File.separator +
"resources" + File.separator + "ExecuteCommand" + File.separator +
"TestSuccess.jar";
+ assertEquals(expected, attribute.substring(attribute.length() -
expected.length()));
+
+ MockFlowFile originalFlowFile =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP).get(0);
+ assertEquals(outputFlowFile.getAttribute("execution.status"),
originalFlowFile.getAttribute("execution.status"));
+ assertEquals(outputFlowFile.getAttribute("execution.command"),
originalFlowFile.getAttribute("execution.command"));
+ assertEquals(outputFlowFile.getAttribute("execution.command.args"),
originalFlowFile.getAttribute("execution.command.args"));
+ }
+
+
+ @Test
public void testExecuteJarWithBadPath() throws Exception {
File exJar = new
File("src/test/resources/ExecuteCommand/noSuchFile.jar");
File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
@@ -101,6 +154,39 @@ public class TestExecuteStreamCommand {
}
@Test
+ public void testExecuteJarWithBadPathDynamicProperties() throws Exception {
+ File exJar = new
File("src/test/resources/ExecuteCommand/noSuchFile.jar");
+ File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.enqueue(dummy.toPath());
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY,
ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1, "-jar");
+ PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.2")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp2, jarPath);
+ controller.run(1);
+
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP,
0);
+
controller.assertTransferCount(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP,
1);
+ List<MockFlowFile> flowFiles =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP);
+ MockFlowFile flowFile = flowFiles.get(0);
+ assertEquals(0, flowFile.getSize());
+ assertEquals("Error: Unable to access jarfile",
flowFile.getAttribute("execution.error").substring(0, 31));
+ assertTrue(flowFile.isPenalized());
+ }
+
+ @Test
public void testExecuteIngestAndUpdate() throws IOException {
File exJar = new
File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
@@ -129,6 +215,46 @@ public class TestExecuteStreamCommand {
}
@Test
+ public void testExecuteIngestAndUpdateDynamicProperties() throws
IOException {
+ File exJar = new
File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
+ File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ File dummy10MBytes = new File("target/10MB.txt");
+ try (FileOutputStream fos = new FileOutputStream(dummy10MBytes)) {
+ byte[] bytes = Files.readAllBytes(dummy.toPath());
+ assertEquals(1000, bytes.length);
+ for (int i = 0; i < 10000; i++) {
+ fos.write(bytes, 0, 1000);
+ }
+ }
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.enqueue(dummy10MBytes.toPath());
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY,
ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1, "-jar");
+ PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.2")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp2, jarPath);
+ controller.run(1);
+
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP,
1);
+ List<MockFlowFile> flowFiles =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
+ byte[] byteArray = flowFiles.get(0).toByteArray();
+ String result = new String(byteArray);
+
+
assertTrue(Pattern.compile("nifi-standard-processors:ModifiedResult\r?\n").matcher(result).find());
+ }
+
+ @Test
public void testLoggingToStdErr() throws IOException {
File exJar = new
File("src/test/resources/ExecuteCommand/TestLogStdErr.jar");
File dummy = new File("src/test/resources/ExecuteCommand/1mb.txt");
@@ -149,6 +275,38 @@ public class TestExecuteStreamCommand {
}
@Test
+ public void testLoggingToStdErrDynamicProperties() throws IOException {
+ File exJar = new
File("src/test/resources/ExecuteCommand/TestLogStdErr.jar");
+ File dummy = new File("src/test/resources/ExecuteCommand/1mb.txt");
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.setValidateExpressionUsage(false);
+ controller.enqueue(dummy.toPath());
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY,
ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1, "-jar");
+ PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.2")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp2, jarPath);
+ controller.run(1);
+
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP,
1);
+ List<MockFlowFile> flowFiles =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
+ MockFlowFile flowFile = flowFiles.get(0);
+ assertEquals(0, flowFile.getSize());
+ assertEquals("fffffffffffffffffffffffffffffff",
flowFile.getAttribute("execution.error").substring(0, 31));
+ }
+
+ @Test
public void testExecuteIngestAndUpdateWithWorkingDir() throws IOException {
File exJar = new
File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
@@ -171,6 +329,40 @@ public class TestExecuteStreamCommand {
}
@Test
+ public void testExecuteIngestAndUpdateWithWorkingDirDynamicProperties()
throws IOException {
+ File exJar = new
File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
+ File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.enqueue(dummy.toPath());
+ controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY,
ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1, "-jar");
+ PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.2")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp2, jarPath);
+ controller.run(1);
+
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP,
1);
+ List<MockFlowFile> flowFiles =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
+ byte[] byteArray = flowFiles.get(0).toByteArray();
+ String result = new String(byteArray);
+
+ final String quotedSeparator = Pattern.quote(File.separator);
+ assertTrue(Pattern.compile(quotedSeparator +
"nifi-standard-processors" + quotedSeparator +
"target:ModifiedResult\r?\n").matcher(result).find());
+ }
+
+ @Test
public void testIgnoredStdin() throws IOException {
File exJar = new
File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
@@ -192,6 +384,40 @@ public class TestExecuteStreamCommand {
Pattern.compile("target:ModifiedResult\r?\n$").matcher(result).find());
}
+ @Test
+ public void testIgnoredStdinDynamicProperties() throws IOException {
+ File exJar = new
File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
+ File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.enqueue(dummy.toPath());
+ controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY,
ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1, "-jar");
+ PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.2")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp2, jarPath);
+ controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
+ controller.run(1);
+
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP,
1);
+ List<MockFlowFile> flowFiles =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
+ byte[] byteArray = flowFiles.get(0).toByteArray();
+ String result = new String(byteArray);
+ assertTrue("TestIngestAndUpdate.jar should not have received anything
to modify",
+
Pattern.compile("target:ModifiedResult\r?\n$").matcher(result).find());
+ }
+
// this is dependent on window with cygwin...so it's not enabled
@Ignore
@Test
@@ -243,6 +469,43 @@ public class TestExecuteStreamCommand {
}
@Test
+ public void testDynamicEnvironmentDynamicProperties() throws Exception {
+ File exJar = new
File("src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar");
+ File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.setProperty("NIFI_TEST_1", "testvalue1");
+ controller.setProperty("NIFI_TEST_2", "testvalue2");
+ controller.enqueue(dummy.toPath());
+ controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY,
ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1, "-jar");
+ PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.2")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp2, jarPath);
+ controller.run(1);
+
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP,
1);
+ List<MockFlowFile> flowFiles =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
+ byte[] byteArray = flowFiles.get(0).toByteArray();
+ String result = new String(byteArray);
+ Set<String> dynamicEnvironmentVariables = new
HashSet<>(Arrays.asList(result.split("\r?\n")));
+ assertFalse("Should contain at least two environment variables
starting with NIFI", dynamicEnvironmentVariables.size() < 2);
+ assertTrue("NIFI_TEST_1 environment variable is missing",
dynamicEnvironmentVariables.contains("NIFI_TEST_1=testvalue1"));
+ assertTrue("NIFI_TEST_2 environment variable is missing",
dynamicEnvironmentVariables.contains("NIFI_TEST_2=testvalue2"));
+ }
+
+ @Test
public void testSmallEchoPutToAttribute() throws Exception {
File dummy = new File("src/test/resources/hello.txt");
assertTrue(dummy.exists());
@@ -274,6 +537,108 @@ public class TestExecuteStreamCommand {
}
@Test
+ public void testSmallEchoPutToAttributeDynamicProperties() throws
Exception {
+ File dummy = new File("src/test/resources/hello.txt");
+ assertTrue(dummy.exists());
+ final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.enqueue("".getBytes());
+ controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY,
ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
+
+ if(isWindows()) {
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND,
"cmd.exe");
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1, "/c");
+ PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.2")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp2, "echo Hello");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS,
"/c;echo Hello");
+ } else{
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND,
"echo");
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1, "Hello");
+ }
+ controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
+ controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE,
"executeStreamCommand.output");
+
+ controller.run(1);
+
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP,
0);
+
+ List<MockFlowFile> flowFiles =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+ MockFlowFile outputFlowFile = flowFiles.get(0);
+ outputFlowFile.assertContentEquals("");
+ String ouput =
outputFlowFile.getAttribute("executeStreamCommand.output");
+ assertTrue(ouput.startsWith("Hello"));
+ assertEquals("0", outputFlowFile.getAttribute("execution.status"));
+ assertEquals(isWindows() ? "cmd.exe" : "echo",
outputFlowFile.getAttribute("execution.command"));
+ }
+
+ @Test
+ public void testArgumentsWithQuotesFromAttributeDynamicProperties() throws
Exception {
+ File dummy = new File("src/test/resources/TestJson/json-sample.json");
+ assertTrue(dummy.exists());
+ final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+
+ Map<String,String> attrs = new HashMap<>();
+
+ String json = FileUtils.readFileToString(dummy,
StandardCharsets.UTF_8);
+ attrs.put("json.attribute",json);
+ controller.enqueue("".getBytes(),attrs);
+
+ controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY,
ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
+
+ if(isWindows()) {
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND,
"cmd.exe");
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1, "/c");
+ PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.2")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp2, "echo");
+ } else{
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND,
"echo");
+ }
+ PropertyDescriptor dynamicProp3 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.3")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp3, "${json.attribute}");
+ controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
+
+ controller.run(1);
+
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP,
1);
+
+ List<MockFlowFile> flowFiles =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
+ MockFlowFile outputFlowFile = flowFiles.get(0);
+ String output = new String(outputFlowFile.toByteArray());
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode tree1 = mapper.readTree(json);
+ JsonNode tree2 = mapper.readTree(output);
+ assertEquals(tree1,tree2);
+ assertEquals("0", outputFlowFile.getAttribute("execution.status"));
+ assertEquals(isWindows() ? "cmd.exe" : "echo",
outputFlowFile.getAttribute("execution.command"));
+ }
+
+ @Test
public void testExecuteJarPutToAttribute() throws Exception {
File exJar = new
File("src/test/resources/ExecuteCommand/TestSuccess.jar");
File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
@@ -302,6 +667,46 @@ public class TestExecuteStreamCommand {
}
@Test
+ public void testExecuteJarPutToAttributeDynamicProperties() throws
Exception {
+ File exJar = new
File("src/test/resources/ExecuteCommand/TestSuccess.jar");
+ File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.enqueue(dummy.toPath());
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY,
ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1, "-jar");
+ PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.2")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp2, jarPath);
+ controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE,
"executeStreamCommand.output");
+ controller.run(1);
+
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP,
0);
+
+ List<MockFlowFile> flowFiles =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+ MockFlowFile outputFlowFile = flowFiles.get(0);
+ String result =
outputFlowFile.getAttribute("executeStreamCommand.output");
+ outputFlowFile.assertContentEquals(dummy);
+ assertTrue(Pattern.compile("Test was a
success\r?\n").matcher(result).find());
+ assertEquals("0", outputFlowFile.getAttribute("execution.status"));
+ assertEquals("java", outputFlowFile.getAttribute("execution.command"));
+ assertEquals("-jar",
outputFlowFile.getAttribute("execution.command.args").substring(0, 4));
+ String attribute =
outputFlowFile.getAttribute("execution.command.args");
+ String expected = "src" + File.separator + "test" + File.separator +
"resources" + File.separator + "ExecuteCommand" + File.separator +
"TestSuccess.jar";
+ assertEquals(expected, attribute.substring(attribute.length() -
expected.length()));
+ }
+
+ @Test
public void testExecuteJarToAttributeConfiguration() throws Exception {
File exJar = new
File("src/test/resources/ExecuteCommand/TestSuccess.jar");
String jarPath = exJar.getAbsolutePath();
@@ -332,6 +737,48 @@ public class TestExecuteStreamCommand {
}
@Test
+ public void testExecuteJarToAttributeConfigurationDyanmicProperties()
throws Exception {
+ File exJar = new
File("src/test/resources/ExecuteCommand/TestSuccess.jar");
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.enqueue("small test".getBytes());
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY,
ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1, "-jar");
+ PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.2")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp2, jarPath);
+ controller.setProperty(ExecuteStreamCommand.PUT_ATTRIBUTE_MAX_LENGTH,
"10");
+ controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE,
"outputDest");
+ assertEquals(1,
controller.getProcessContext().getAvailableRelationships().size());
+ controller.run(1);
+
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP,
0);
+
controller.assertTransferCount(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP,
0);
+
+ List<MockFlowFile> flowFiles =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+ MockFlowFile outputFlowFile = flowFiles.get(0);
+ outputFlowFile.assertContentEquals("small test".getBytes());
+ String result = outputFlowFile.getAttribute("outputDest");
+ assertTrue(Pattern.compile("Test was a").matcher(result).find());
+ assertEquals("0", outputFlowFile.getAttribute("execution.status"));
+ assertEquals("java", outputFlowFile.getAttribute("execution.command"));
+ assertEquals("-jar",
outputFlowFile.getAttribute("execution.command.args").substring(0,4));
+ String attribute =
outputFlowFile.getAttribute("execution.command.args");
+ String expected = "src" + File.separator + "test" + File.separator +
"resources" + File.separator + "ExecuteCommand" + File.separator +
"TestSuccess.jar";
+ assertEquals(expected, attribute.substring(attribute.length() -
expected.length()));
+ }
+
+ @Test
public void testExecuteIngestAndUpdatePutToAttribute() throws IOException {
File exJar = new
File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
@@ -359,6 +806,45 @@ public class TestExecuteStreamCommand {
}
@Test
+ public void testExecuteIngestAndUpdatePutToAttributeDynamicProperties()
throws IOException {
+ File exJar = new
File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
+ File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ File dummy10MBytes = new File("target/10MB.txt");
+ byte[] bytes = Files.readAllBytes(dummy.toPath());
+ try (FileOutputStream fos = new FileOutputStream(dummy10MBytes)) {
+ for (int i = 0; i < 10000; i++) {
+ fos.write(bytes, 0, 1000);
+ }
+ }
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.enqueue(dummy10MBytes.toPath());
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY,
ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1, "-jar");
+ PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.2")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp2, jarPath);
+ controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE,
"outputDest");
+ controller.run(1);
+
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP,
0);
+ List<MockFlowFile> flowFiles =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+ String result = flowFiles.get(0).getAttribute("outputDest");
+
+
assertTrue(Pattern.compile("nifi-standard-processors:ModifiedResult\r?\n").matcher(result).find());
+ }
+
+ @Test
public void testLargePutToAttribute() throws IOException {
File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
File dummy10MBytes = new File("target/10MB.txt");
@@ -394,6 +880,57 @@ public class TestExecuteStreamCommand {
}
@Test
+ public void testLargePutToAttributeDynamicProperties() throws IOException {
+ File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ File dummy10MBytes = new File("target/10MB.txt");
+ byte[] bytes = Files.readAllBytes(dummy.toPath());
+ try (FileOutputStream fos = new FileOutputStream(dummy10MBytes)) {
+ for (int i = 0; i < 10000; i++) {
+ fos.write(bytes, 0, 1000);
+ }
+ }
+
+ final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.enqueue("".getBytes());
+ controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY,
ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
+ if(isWindows()) {
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND,
"cmd.exe");
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1, "/c");
+ PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.2")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp2, "type " +
dummy10MBytes.getAbsolutePath());
+ } else{
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND,
"cat");
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1,
dummy10MBytes.getAbsolutePath());
+ }
+ controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
+ controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE,
"executeStreamCommand.output");
+ controller.setProperty(ExecuteStreamCommand.PUT_ATTRIBUTE_MAX_LENGTH,
"256");
+
+ controller.run(1);
+
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP,
0);
+ List<MockFlowFile> flowFiles =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+
+ flowFiles.get(0).assertAttributeEquals("execution.status", "0");
+ String result =
flowFiles.get(0).getAttribute("executeStreamCommand.output");
+ assertTrue(Pattern.compile("a{256}").matcher(result).matches());
+ }
+
+ @Test
public void testExecuteIngestAndUpdateWithWorkingDirPutToAttribute()
throws IOException {
File exJar = new
File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
@@ -415,6 +952,39 @@ public class TestExecuteStreamCommand {
}
@Test
+ public void
testExecuteIngestAndUpdateWithWorkingDirPutToAttributeDynamicProperties()
throws IOException {
+ File exJar = new
File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
+ File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.enqueue(dummy.toPath());
+ controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE,
"streamOutput");
+ controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY,
ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1, "-jar");
+ PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.2")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp2, jarPath);
+ controller.run(1);
+
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+ List<MockFlowFile> flowFiles =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+ String result = flowFiles.get(0).getAttribute("streamOutput");
+
+ final String quotedSeparator = Pattern.quote(File.separator);
+ assertTrue(Pattern.compile(quotedSeparator +
"nifi-standard-processors" + quotedSeparator +
"target:ModifiedResult\r?\n").matcher(result).find());
+ }
+
+ @Test
public void testIgnoredStdinPutToAttribute() throws IOException {
File exJar = new
File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
@@ -436,6 +1006,39 @@ public class TestExecuteStreamCommand {
}
@Test
+ public void testIgnoredStdinPutToAttributeDynamicProperties() throws
IOException {
+ File exJar = new
File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
+ File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.enqueue(dummy.toPath());
+ controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY,
ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1, "-jar");
+ PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.2")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp2, jarPath);
+ controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
+ controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE,
"executeStreamCommand.output");
+ controller.run(1);
+
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+ List<MockFlowFile> flowFiles =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+ String result =
flowFiles.get(0).getAttribute("executeStreamCommand.output");
+ assertTrue("TestIngestAndUpdate.jar should not have received anything
to modify",
+
Pattern.compile("target:ModifiedResult\r?\n?").matcher(result).find());
+ }
+
+ @Test
public void testDynamicEnvironmentPutToAttribute() throws Exception {
File exJar = new
File("src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar");
File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
@@ -460,6 +1063,42 @@ public class TestExecuteStreamCommand {
}
@Test
+ public void testDynamicEnvironmentPutToAttributeDynamicProperties() throws
Exception {
+ File exJar = new
File("src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar");
+ File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.setProperty("NIFI_TEST_1", "testvalue1");
+ controller.setProperty("NIFI_TEST_2", "testvalue2");
+ controller.enqueue(dummy.toPath());
+ controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY,
ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1, "-jar");
+ PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.2")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp2, jarPath);
+ controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE,
"executeStreamCommand.output");
+ controller.run(1);
+
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+ List<MockFlowFile> flowFiles =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+ String result =
flowFiles.get(0).getAttribute("executeStreamCommand.output");
+ Set<String> dynamicEnvironmentVariables = new
HashSet<>(Arrays.asList(result.split("\r?\n")));
+ assertFalse("Should contain at least two environment variables
starting with NIFI", dynamicEnvironmentVariables.size() < 2);
+ assertTrue("NIFI_TEST_1 environment variable is missing",
dynamicEnvironmentVariables.contains("NIFI_TEST_1=testvalue1"));
+ assertTrue("NIFI_TEST_2 environment variable is missing",
dynamicEnvironmentVariables.contains("NIFI_TEST_2=testvalue2"));
+ }
+
+ @Test
public void testQuotedArguments() throws Exception {
List<String> args = ArgumentUtils.splitArgs("echo -n \"arg1 arg2
arg3\"", ' ');
assertEquals(3, args.size());
@@ -507,6 +1146,47 @@ public class TestExecuteStreamCommand {
assertEquals(expected, attribute.substring(attribute.length() -
expected.length()));
}
+ @Test
+ public void testExecuteJarPutToAttributeBadPathDynamicProperties() throws
Exception {
+ File exJar = new
File("src/test/resources/ExecuteCommand/noSuchFile.jar");
+ File dummy = new
File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller =
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.enqueue(dummy.toPath());
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY,
ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
+ PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.1")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp1, "-jar");
+ PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
+ .dynamic(true)
+ .name("command.argument.2")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ controller.setProperty(dynamicProp2, jarPath);
+ controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE,
"executeStreamCommand.output");
+ controller.run(1);
+
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP,
0);
+
controller.assertTransferCount(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP,
0);
+
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+
+ List<MockFlowFile> flowFiles =
controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+ MockFlowFile outputFlowFile = flowFiles.get(0);
+ String result =
outputFlowFile.getAttribute("executeStreamCommand.output");
+ outputFlowFile.assertContentEquals(dummy);
+ assertTrue(result.isEmpty()); // java -jar with bad path only prints
to standard error not standard out
+ assertEquals("1", outputFlowFile.getAttribute("execution.status")); //
java -jar with bad path exits with code 1
+ assertEquals("java", outputFlowFile.getAttribute("execution.command"));
+ assertEquals("-jar",
outputFlowFile.getAttribute("execution.command.args").substring(0, 4));
+ String attribute =
outputFlowFile.getAttribute("execution.command.args");
+ String expected = "src" + File.separator + "test" + File.separator +
"resources" + File.separator + "ExecuteCommand" + File.separator +
"noSuchFile.jar";
+ assertEquals(expected, attribute.substring(attribute.length() -
expected.length()));
+ }
+
private static boolean isWindows() {
return
System.getProperty("os.name").toLowerCase().startsWith("windows");
}