NIFI-583 Adjusting the callback to be aware of whether or not STDIN should be ignored, preferring to bypass the stream copying process and the reuse of the outputflowfile as the source.
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/096ca61e Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/096ca61e Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/096ca61e Branch: refs/heads/NIFI-632 Commit: 096ca61e255a6b4f91d525239ca95aefb508fb2e Parents: ceda661 Author: Aldrin Piri <[email protected]> Authored: Sun Jun 7 15:02:57 2015 -0400 Committer: Aldrin Piri <[email protected]> Committed: Sun Jun 7 16:01:54 2015 -0400 ---------------------------------------------------------------------- .../standard/ExecuteStreamCommand.java | 44 ++++++++++---------- 1 file changed, 23 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/096ca61e/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java index f97a455..676bd07 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java @@ -232,22 +232,22 @@ public class ExecuteStreamCommand extends AbstractProcessor { @Override public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (null == flowFile) { + FlowFile inputFlowFile = session.get(); + if (null == inputFlowFile) { return; } final ArrayList<String> args = new ArrayList<>(); - final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(flowFile).getValue(); + final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(inputFlowFile).getValue(); args.add(executeCommand); final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue(); final boolean ignoreStdin = Boolean.parseBoolean(context.getProperty(IGNORE_STDIN).getValue()); if (!StringUtils.isBlank(commandArguments)) { for (String arg : commandArguments.split(";")) { - args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(flowFile).getValue()); + args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(inputFlowFile).getValue()); } } - final String workingDir = context.getProperty(WORKING_DIR).evaluateAttributeExpressions(flowFile).getValue(); + final String workingDir = context.getProperty(WORKING_DIR).evaluateAttributeExpressions(inputFlowFile).getValue(); final ProcessBuilder builder = new ProcessBuilder(); @@ -284,13 +284,9 @@ public class ExecuteStreamCommand extends AbstractProcessor { final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes))) { int exitCode = -1; final BufferedOutputStream bos = new BufferedOutputStream(pos); - FlowFile outputStreamFlowFile = session.create(flowFile); - StdInWriterCallback callback = new StdInWriterCallback(bos, bis, logger, session, outputStreamFlowFile, process); - if (ignoreStdin) { - session.read(outputStreamFlowFile, callback); - } else { - session.read(flowFile, callback); - } + FlowFile outputStreamFlowFile = session.create(inputFlowFile); + ProcessStreamWriterCallback callback = new ProcessStreamWriterCallback(ignoreStdin, bos, bis, logger, session, outputStreamFlowFile, process); + session.read(inputFlowFile, callback); outputStreamFlowFile = callback.outputStreamFlowFile; exitCode = callback.exitCode; logger.debug("Execution complete for command: {}. Exited with code: {}", new Object[]{executeCommand, exitCode}); @@ -321,9 +317,9 @@ public class ExecuteStreamCommand extends AbstractProcessor { attributes.put("execution.command.args", commandArguments); outputStreamFlowFile = session.putAllAttributes(outputStreamFlowFile, attributes); session.transfer(outputStreamFlowFile, OUTPUT_STREAM_RELATIONSHIP); - logger.info("Transferring flow file {} to original", new Object[]{flowFile}); - flowFile = session.putAllAttributes(flowFile, attributes); - session.transfer(flowFile, ORIGINAL_RELATIONSHIP); + logger.info("Transferring flow file {} to original", new Object[]{inputFlowFile}); + inputFlowFile = session.putAllAttributes(inputFlowFile, attributes); + session.transfer(inputFlowFile, ORIGINAL_RELATIONSHIP); } catch (final IOException ex) { // could not close Process related streams @@ -333,8 +329,9 @@ public class ExecuteStreamCommand extends AbstractProcessor { } } - static class StdInWriterCallback implements InputStreamCallback { + static class ProcessStreamWriterCallback implements InputStreamCallback { + final boolean ignoreStdin; final OutputStream stdInWritable; final InputStream stdOutReadable; final ProcessorLog logger; @@ -343,7 +340,9 @@ public class ExecuteStreamCommand extends AbstractProcessor { FlowFile outputStreamFlowFile; int exitCode; - public StdInWriterCallback(OutputStream stdInWritable, InputStream stdOutReadable, ProcessorLog logger, ProcessSession session, FlowFile outputStreamFlowFile, Process process) { + public ProcessStreamWriterCallback(boolean ignoreStdin, OutputStream stdInWritable, InputStream stdOutReadable, + ProcessorLog logger, ProcessSession session, FlowFile outputStreamFlowFile, Process process) { + this.ignoreStdin = ignoreStdin; this.stdInWritable = stdInWritable; this.stdOutReadable = stdOutReadable; this.logger = logger; @@ -358,14 +357,17 @@ public class ExecuteStreamCommand extends AbstractProcessor { @Override public void process(OutputStream out) throws IOException { + Thread writerThread = new Thread(new Runnable() { @Override public void run() { - try { - StreamUtils.copy(incomingFlowFileIS, stdInWritable); - } catch (IOException e) { - logger.error("Failed to write flow file to stdIn due to {}", new Object[]{e}, e); + if (!ignoreStdin) { + try { + StreamUtils.copy(incomingFlowFileIS, stdInWritable); + } catch (IOException e) { + logger.error("Failed to write flow file to stdIn due to {}", new Object[]{e}, e); + } } // MUST close the output stream to the stdIn so that whatever is reading knows // there is no more data
