NIFI-583 Adjusting the callback to be aware of whether or not STDIN should be 
ignored, preferring to bypass the stream copying process and the reuse of the 
outputflowfile as the source.


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/096ca61e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/096ca61e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/096ca61e

Branch: refs/heads/NIFI-632
Commit: 096ca61e255a6b4f91d525239ca95aefb508fb2e
Parents: ceda661
Author: Aldrin Piri <[email protected]>
Authored: Sun Jun 7 15:02:57 2015 -0400
Committer: Aldrin Piri <[email protected]>
Committed: Sun Jun 7 16:01:54 2015 -0400

----------------------------------------------------------------------
 .../standard/ExecuteStreamCommand.java          | 44 ++++++++++----------
 1 file changed, 23 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/096ca61e/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
index f97a455..676bd07 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
@@ -232,22 +232,22 @@ public class ExecuteStreamCommand extends 
AbstractProcessor {
 
     @Override
     public void onTrigger(ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        FlowFile flowFile = session.get();
-        if (null == flowFile) {
+        FlowFile inputFlowFile = session.get();
+        if (null == inputFlowFile) {
             return;
         }
 
         final ArrayList<String> args = new ArrayList<>();
-        final String executeCommand = 
context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(flowFile).getValue();
+        final String executeCommand = 
context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(inputFlowFile).getValue();
         args.add(executeCommand);
         final String commandArguments = 
context.getProperty(EXECUTION_ARGUMENTS).getValue();
         final boolean ignoreStdin = 
Boolean.parseBoolean(context.getProperty(IGNORE_STDIN).getValue());
         if (!StringUtils.isBlank(commandArguments)) {
             for (String arg : commandArguments.split(";")) {
-                
args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(flowFile).getValue());
+                
args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(inputFlowFile).getValue());
             }
         }
-        final String workingDir = 
context.getProperty(WORKING_DIR).evaluateAttributeExpressions(flowFile).getValue();
+        final String workingDir = 
context.getProperty(WORKING_DIR).evaluateAttributeExpressions(inputFlowFile).getValue();
 
         final ProcessBuilder builder = new ProcessBuilder();
 
@@ -284,13 +284,9 @@ public class ExecuteStreamCommand extends 
AbstractProcessor {
                 final BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(pes))) {
             int exitCode = -1;
             final BufferedOutputStream bos = new BufferedOutputStream(pos);
-            FlowFile outputStreamFlowFile = session.create(flowFile);
-            StdInWriterCallback callback = new StdInWriterCallback(bos, bis, 
logger, session, outputStreamFlowFile, process);
-            if (ignoreStdin) {
-                session.read(outputStreamFlowFile, callback);
-            } else {
-                session.read(flowFile, callback);
-            }
+            FlowFile outputStreamFlowFile = session.create(inputFlowFile);
+            ProcessStreamWriterCallback callback = new 
ProcessStreamWriterCallback(ignoreStdin, bos, bis, logger, session, 
outputStreamFlowFile, process);
+            session.read(inputFlowFile, callback);
             outputStreamFlowFile = callback.outputStreamFlowFile;
             exitCode = callback.exitCode;
             logger.debug("Execution complete for command: {}.  Exited with 
code: {}", new Object[]{executeCommand, exitCode});
@@ -321,9 +317,9 @@ public class ExecuteStreamCommand extends AbstractProcessor 
{
             attributes.put("execution.command.args", commandArguments);
             outputStreamFlowFile = 
session.putAllAttributes(outputStreamFlowFile, attributes);
             session.transfer(outputStreamFlowFile, OUTPUT_STREAM_RELATIONSHIP);
-            logger.info("Transferring flow file {} to original", new 
Object[]{flowFile});
-            flowFile = session.putAllAttributes(flowFile, attributes);
-            session.transfer(flowFile, ORIGINAL_RELATIONSHIP);
+            logger.info("Transferring flow file {} to original", new 
Object[]{inputFlowFile});
+            inputFlowFile = session.putAllAttributes(inputFlowFile, 
attributes);
+            session.transfer(inputFlowFile, ORIGINAL_RELATIONSHIP);
 
         } catch (final IOException ex) {
             // could not close Process related streams
@@ -333,8 +329,9 @@ public class ExecuteStreamCommand extends AbstractProcessor 
{
         }
     }
 
-    static class StdInWriterCallback implements InputStreamCallback {
+    static class ProcessStreamWriterCallback implements InputStreamCallback {
 
+        final boolean ignoreStdin;
         final OutputStream stdInWritable;
         final InputStream stdOutReadable;
         final ProcessorLog logger;
@@ -343,7 +340,9 @@ public class ExecuteStreamCommand extends AbstractProcessor 
{
         FlowFile outputStreamFlowFile;
         int exitCode;
 
-        public StdInWriterCallback(OutputStream stdInWritable, InputStream 
stdOutReadable, ProcessorLog logger, ProcessSession session, FlowFile 
outputStreamFlowFile, Process process) {
+        public ProcessStreamWriterCallback(boolean ignoreStdin, OutputStream 
stdInWritable, InputStream stdOutReadable,
+                                           ProcessorLog logger, ProcessSession 
session, FlowFile outputStreamFlowFile, Process process) {
+            this.ignoreStdin = ignoreStdin;
             this.stdInWritable = stdInWritable;
             this.stdOutReadable = stdOutReadable;
             this.logger = logger;
@@ -358,14 +357,17 @@ public class ExecuteStreamCommand extends 
AbstractProcessor {
 
                 @Override
                 public void process(OutputStream out) throws IOException {
+
                     Thread writerThread = new Thread(new Runnable() {
 
                         @Override
                         public void run() {
-                            try {
-                                StreamUtils.copy(incomingFlowFileIS, 
stdInWritable);
-                            } catch (IOException e) {
-                                logger.error("Failed to write flow file to 
stdIn due to {}", new Object[]{e}, e);
+                            if (!ignoreStdin) {
+                                try {
+                                    StreamUtils.copy(incomingFlowFileIS, 
stdInWritable);
+                                } catch (IOException e) {
+                                    logger.error("Failed to write flow file to 
stdIn due to {}", new Object[]{e}, e);
+                                }
                             }
                             // MUST close the output stream to the stdIn so 
that whatever is reading knows
                             // there is no more data

Reply via email to