Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 42a2fc5f6 -> 6d1128497


NIFI-583: Ignore STDIN for ExecuteStreamCommand

- Added the ability (default: false) to ignore STDIN when passing
  a flowfile to the ExecuteStreamCommand processor. This is useful if
  the command you are executing cannot take STDIN, or passing STDIN is
  unnecessary

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/6d112849
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/6d112849
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/6d112849

Branch: refs/heads/develop
Commit: 6d1128497bcfcfd105fd3d18305051615b254576
Parents: 42a2fc5
Author: ricky <[email protected]>
Authored: Mon May 4 17:12:04 2015 -0400
Committer: Aldrin Piri <[email protected]>
Committed: Fri Jun 5 14:17:39 2015 -0400

----------------------------------------------------------------------
 .../standard/ExecuteStreamCommand.java          | 25 ++++++++++++++++--
 .../standard/TestExecuteStreamCommand.java      | 27 ++++++++++++++++++--
 2 files changed, 48 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6d112849/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
index 4c4288a..f97a455 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
@@ -89,7 +89,13 @@ import org.apache.nifi.stream.io.StreamUtils;
  * <li>Supports expression language: true</li>
  * </ul>
  * </li>
- *
+ * <li>Ignore STDIN
+ * <ul>
+ * <li>Indicates whether or not the flowfile's contents should be streamed as 
part of STDIN</li>
+ * <li>Default value: false (this means that the contents of a flowfile will 
be sent as STDIN to your command</li>
+ * <li>Supports expression language: false</li>
+ * </ul>
+ * </li>
  * </ul>
  *
  * <p>
@@ -177,12 +183,22 @@ public class ExecuteStreamCommand extends 
AbstractProcessor {
             .required(false)
             .build();
 
+    static final PropertyDescriptor IGNORE_STDIN = new 
PropertyDescriptor.Builder()
+            .name("Ignore STDIN")
+            .description("If true, the contents of the incoming flowfile will 
not be passed to the executing command")
+            .addValidator(Validator.VALID)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+
     private static final List<PropertyDescriptor> PROPERTIES;
 
     static {
         List<PropertyDescriptor> props = new ArrayList<>();
         props.add(EXECUTION_ARGUMENTS);
         props.add(EXECUTION_COMMAND);
+        props.add(IGNORE_STDIN);
         props.add(WORKING_DIR);
         PROPERTIES = Collections.unmodifiableList(props);
     }
@@ -225,6 +241,7 @@ public class ExecuteStreamCommand extends AbstractProcessor 
{
         final String executeCommand = 
context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(flowFile).getValue();
         args.add(executeCommand);
         final String commandArguments = 
context.getProperty(EXECUTION_ARGUMENTS).getValue();
+        final boolean ignoreStdin = 
Boolean.parseBoolean(context.getProperty(IGNORE_STDIN).getValue());
         if (!StringUtils.isBlank(commandArguments)) {
             for (String arg : commandArguments.split(";")) {
                 
args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(flowFile).getValue());
@@ -269,7 +286,11 @@ public class ExecuteStreamCommand extends 
AbstractProcessor {
             final BufferedOutputStream bos = new BufferedOutputStream(pos);
             FlowFile outputStreamFlowFile = session.create(flowFile);
             StdInWriterCallback callback = new StdInWriterCallback(bos, bis, 
logger, session, outputStreamFlowFile, process);
-            session.read(flowFile, callback);
+            if (ignoreStdin) {
+                session.read(outputStreamFlowFile, callback);
+            } else {
+                session.read(flowFile, callback);
+            }
             outputStreamFlowFile = callback.outputStreamFlowFile;
             exitCode = callback.exitCode;
             logger.debug("Execution complete for command: {}.  Exited with 
code: {}", new Object[]{executeCommand, exitCode});

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6d112849/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
index 555c3e4..90fb28f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
@@ -132,7 +132,7 @@ public class TestExecuteStreamCommand {
         String result = new String(byteArray);
 
         assertTrue(result.contains(File.separator + 
"nifi-standard-processors:ModifiedResult\r\n")
-                || result.contains(File.separator + 
"nifi-standard-processors:ModifiedResult\n"));
+          || result.contains(File.separator + 
"nifi-standard-processors:ModifiedResult\n"));
     }
 
     @Test
@@ -154,7 +154,30 @@ public class TestExecuteStreamCommand {
         byte[] byteArray = flowFiles.get(0).toByteArray();
         String result = new String(byteArray);
         assertTrue(result.contains(File.separator + "nifi-standard-processors" 
+ File.separator + "target:ModifiedResult\r\n")
-                || result.contains(File.separator + "nifi-standard-processors" 
+ File.separator + "target:ModifiedResult\n"));
+          || result.contains(File.separator + "nifi-standard-processors" + 
File.separator + "target:ModifiedResult\n"));
+    }
+
+    @Test
+    public void testIgnoredStdin() throws IOException {
+        File exJar = new 
File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
+        File dummy = new 
File("src/test/resources/ExecuteCommand/1000bytes.txt");
+        String jarPath = exJar.getAbsolutePath();
+        exJar.setExecutable(true);
+        final TestRunner controller = 
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+        controller.setValidateExpressionUsage(false);
+        controller.enqueue(dummy.toPath());
+        controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
+        controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+        controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, 
"-jar;" + jarPath);
+        controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
+        controller.run(1);
+        
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+        
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 
1);
+        List<MockFlowFile> flowFiles = 
controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
+        byte[] byteArray = flowFiles.get(0).toByteArray();
+        String result = new String(byteArray);
+        assertTrue("TestIngestAndUpdate.jar should not have received anything 
to modify",
+          result.endsWith("target:ModifiedResult\n"));
     }
 
     // this is dependent on window with cygwin...so it's not enabled

Reply via email to