NIFI-583: Ignore STDIN for ExecuteStreamCommand - Added the ability (default: false) to ignore STDIN when passing a flowfile to the ExecuteStreamCommand processor. This is useful if the command you are executing cannot take STDIN, or passing STDIN is unnecessary
Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/6d112849 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/6d112849 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/6d112849 Branch: refs/heads/NIFI-632 Commit: 6d1128497bcfcfd105fd3d18305051615b254576 Parents: 42a2fc5 Author: ricky <[email protected]> Authored: Mon May 4 17:12:04 2015 -0400 Committer: Aldrin Piri <[email protected]> Committed: Fri Jun 5 14:17:39 2015 -0400 ---------------------------------------------------------------------- .../standard/ExecuteStreamCommand.java | 25 ++++++++++++++++-- .../standard/TestExecuteStreamCommand.java | 27 ++++++++++++++++++-- 2 files changed, 48 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6d112849/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java index 4c4288a..f97a455 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java @@ -89,7 +89,13 @@ import org.apache.nifi.stream.io.StreamUtils; * <li>Supports expression language: true</li> * </ul> * </li> - * + * <li>Ignore STDIN + * <ul> + * <li>Indicates whether or not the flowfile's contents should be streamed as part of STDIN</li> + * <li>Default value: false (this means that the contents of a flowfile will be sent as STDIN to your command</li> + * <li>Supports expression language: false</li> + * </ul> + * </li> * </ul> * * <p> @@ -177,12 +183,22 @@ public class ExecuteStreamCommand extends AbstractProcessor { .required(false) .build(); + static final PropertyDescriptor IGNORE_STDIN = new PropertyDescriptor.Builder() + .name("Ignore STDIN") + .description("If true, the contents of the incoming flowfile will not be passed to the executing command") + .addValidator(Validator.VALID) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + private static final List<PropertyDescriptor> PROPERTIES; static { List<PropertyDescriptor> props = new ArrayList<>(); props.add(EXECUTION_ARGUMENTS); props.add(EXECUTION_COMMAND); + props.add(IGNORE_STDIN); props.add(WORKING_DIR); PROPERTIES = Collections.unmodifiableList(props); } @@ -225,6 +241,7 @@ public class ExecuteStreamCommand extends AbstractProcessor { final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(flowFile).getValue(); args.add(executeCommand); final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue(); + final boolean ignoreStdin = Boolean.parseBoolean(context.getProperty(IGNORE_STDIN).getValue()); if (!StringUtils.isBlank(commandArguments)) { for (String arg : commandArguments.split(";")) { args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(flowFile).getValue()); @@ -269,7 +286,11 @@ public class ExecuteStreamCommand extends AbstractProcessor { final BufferedOutputStream bos = new BufferedOutputStream(pos); FlowFile outputStreamFlowFile = session.create(flowFile); StdInWriterCallback callback = new StdInWriterCallback(bos, bis, logger, session, outputStreamFlowFile, process); - session.read(flowFile, callback); + if (ignoreStdin) { + session.read(outputStreamFlowFile, callback); + } else { + session.read(flowFile, callback); + } outputStreamFlowFile = callback.outputStreamFlowFile; exitCode = callback.exitCode; logger.debug("Execution complete for command: {}. Exited with code: {}", new Object[]{executeCommand, exitCode}); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6d112849/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java index 555c3e4..90fb28f 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java @@ -132,7 +132,7 @@ public class TestExecuteStreamCommand { String result = new String(byteArray); assertTrue(result.contains(File.separator + "nifi-standard-processors:ModifiedResult\r\n") - || result.contains(File.separator + "nifi-standard-processors:ModifiedResult\n")); + || result.contains(File.separator + "nifi-standard-processors:ModifiedResult\n")); } @Test @@ -154,7 +154,30 @@ public class TestExecuteStreamCommand { byte[] byteArray = flowFiles.get(0).toByteArray(); String result = new String(byteArray); assertTrue(result.contains(File.separator + "nifi-standard-processors" + File.separator + "target:ModifiedResult\r\n") - || result.contains(File.separator + "nifi-standard-processors" + File.separator + "target:ModifiedResult\n")); + || result.contains(File.separator + "nifi-standard-processors" + File.separator + "target:ModifiedResult\n")); + } + + @Test + public void testIgnoredStdin() throws IOException { + File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.setValidateExpressionUsage(false); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath); + controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true"); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1); + List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP); + byte[] byteArray = flowFiles.get(0).toByteArray(); + String result = new String(byteArray); + assertTrue("TestIngestAndUpdate.jar should not have received anything to modify", + result.endsWith("target:ModifiedResult\n")); } // this is dependent on window with cygwin...so it's not enabled
