NIFI-1081 Adding option to ExecuteStreamCommand to put output value to an attribute
Reviewed and amended (comments,whitespace,and some code readability (discussed in ticket)) by Tony Kurc ([email protected]) Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1e5cc070 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1e5cc070 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1e5cc070 Branch: refs/heads/NIFI-655 Commit: 1e5cc070a3d29736beea9af0b2d684a9bdcfff8e Parents: 9e2f6df Author: Joseph Percivall <[email protected]> Authored: Wed Nov 18 21:16:12 2015 -0500 Committer: Tony Kurc <[email protected]> Committed: Wed Nov 18 23:23:10 2015 -0500 ---------------------------------------------------------------------- NOTICE | 3 + nifi-assembly/NOTICE | 4 + .../standard/ExecuteStreamCommand.java | 200 ++++++++++++---- .../SoftLimitBoundedByteArrayOutputStream.java | 99 ++++++++ .../standard/TestExecuteStreamCommand.java | 237 ++++++++++++++++++- 5 files changed, 488 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/1e5cc070/NOTICE ---------------------------------------------------------------------- diff --git a/NOTICE b/NOTICE index 9686fba..f39a045 100644 --- a/NOTICE +++ b/NOTICE @@ -4,3 +4,6 @@ Copyright 2014-2015 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). +This product includes the following work from the Apache Hadoop project: + +BoundedByteArrayOutputStream.java adapted to SoftLimitBoundedByteArrayOutputStream.java http://git-wip-us.apache.org/repos/asf/nifi/blob/1e5cc070/nifi-assembly/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index 5e18035..6ec5c31 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -4,6 +4,10 @@ Copyright 2014-2015 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). +This product includes the following work from the Apache Hadoop project: + +BoundedByteArrayOutputStream.java which was adapted to SoftLimitBoundedByteArrayOutputStream.java + =========================================== Apache Software License v2 =========================================== http://git-wip-us.apache.org/repos/asf/nifi/blob/1e5cc070/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java index 9bea6ba..38c8bd4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java @@ -30,6 +30,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -59,6 +60,7 @@ import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.util.ArgumentUtils; +import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream; import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.stream.io.StreamUtils; @@ -126,7 +128,7 @@ import org.apache.nifi.stream.io.StreamUtils; @CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.") @DynamicProperty(name = "An environment variable name", value = "An environment variable value", description = "These environment variables are passed to the process spawned by this Processor") @WritesAttributes({ - @WritesAttribute(attribute = "execution.command", description = "The name of the command executed to create the new FlowFile"), + @WritesAttribute(attribute = "execution.command", description = "The name of the command executed"), @WritesAttribute(attribute = "execution.command.args", description = "The semi-colon delimited list of arguments"), @WritesAttribute(attribute = "execution.status", description = "The exit status code returned from executing the command"), @WritesAttribute(attribute = "execution.error", description = "Any error messages returned from executing the command")}) @@ -140,14 +142,10 @@ public class ExecuteStreamCommand extends AbstractProcessor { .name("output stream") .description("The destination path for the flow file created from the command's output") .build(); - private static final Set<Relationship> RELATIONSHIPS; + private AtomicReference<Set<Relationship>> relationships = new AtomicReference<>(); - static { - Set<Relationship> rels = new HashSet<>(); - rels.add(OUTPUT_STREAM_RELATIONSHIP); - rels.add(ORIGINAL_RELATIONSHIP); - RELATIONSHIPS = Collections.unmodifiableSet(rels); - } + private final static Set<Relationship> OUTPUT_STREAM_RELATIONSHIP_SET; + private final static Set<Relationship> ATTRIBUTE_RELATIONSHIP_SET; private static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true); static final PropertyDescriptor EXECUTION_COMMAND = new PropertyDescriptor.Builder() @@ -195,6 +193,22 @@ public class ExecuteStreamCommand extends AbstractProcessor { .defaultValue("false") .build(); + static final PropertyDescriptor PUT_OUTPUT_IN_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("Output Destination Attribute") + .description("If set, the output of the stream command will be put into an attribute of the original FlowFile instead of a separate " + + "FlowFile. There will no longer be a relationship for 'output stream'. The value of this property will be the key for the output attribute.") + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .build(); + + static final PropertyDescriptor PUT_ATTRIBUTE_MAX_LENGTH = new PropertyDescriptor.Builder() + .name("Max Attribute Length") + .description("If routing the output of the stream command to an attribute, the number of characters put to the attribute value " + + "will be at most this amount. This is important because attributes are held in memory and large attributes will quickly " + + "cause out of memory issues. If the output goes longer than this value, it will truncated to fit. Consider making this smaller if able.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("256") + .build(); + private static final Validator characterValidator = new StandardValidators.StringLengthValidator(1, 1); static final PropertyDescriptor ARG_DELIMITER = new PropertyDescriptor.Builder() @@ -216,19 +230,44 @@ public class ExecuteStreamCommand extends AbstractProcessor { props.add(IGNORE_STDIN); props.add(WORKING_DIR); props.add(ARG_DELIMITER); + props.add(PUT_OUTPUT_IN_ATTRIBUTE); + props.add(PUT_ATTRIBUTE_MAX_LENGTH); PROPERTIES = Collections.unmodifiableList(props); + + + Set<Relationship> outputStreamRelationships = new HashSet<>(); + outputStreamRelationships.add(OUTPUT_STREAM_RELATIONSHIP); + outputStreamRelationships.add(ORIGINAL_RELATIONSHIP); + OUTPUT_STREAM_RELATIONSHIP_SET = Collections.unmodifiableSet(outputStreamRelationships); + + Set<Relationship> attributeRelationships = new HashSet<>(); + attributeRelationships.add(ORIGINAL_RELATIONSHIP); + ATTRIBUTE_RELATIONSHIP_SET = Collections.unmodifiableSet(attributeRelationships); } private ProcessorLog logger; @Override public Set<Relationship> getRelationships() { - return RELATIONSHIPS; + return relationships.get(); } @Override protected void init(ProcessorInitializationContext context) { logger = getLogger(); + + relationships.set(OUTPUT_STREAM_RELATIONSHIP_SET); + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (descriptor.equals(PUT_OUTPUT_IN_ATTRIBUTE)) { + if (newValue != null) { + relationships.set(ATTRIBUTE_RELATIONSHIP_SET); + } else { + relationships.set(OUTPUT_STREAM_RELATIONSHIP_SET); + } + } } @Override @@ -254,6 +293,10 @@ public class ExecuteStreamCommand extends AbstractProcessor { } final ArrayList<String> args = new ArrayList<>(); + final boolean putToAttribute = context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).isSet(); + final Integer attributeSize = context.getProperty(PUT_ATTRIBUTE_MAX_LENGTH).asInteger(); + final String attributeName = context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).getValue(); + final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(inputFlowFile).getValue(); args.add(executeCommand); final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue(); @@ -300,10 +343,17 @@ public class ExecuteStreamCommand extends AbstractProcessor { final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes))) { int exitCode = -1; final BufferedOutputStream bos = new BufferedOutputStream(pos); - FlowFile outputStreamFlowFile = session.create(inputFlowFile); - ProcessStreamWriterCallback callback = new ProcessStreamWriterCallback(ignoreStdin, bos, bis, logger, session, outputStreamFlowFile, process); + FlowFile outputFlowFile = putToAttribute ? inputFlowFile : session.create(inputFlowFile); + + ProcessStreamWriterCallback callback = new ProcessStreamWriterCallback(ignoreStdin, bos, bis, logger, + attributeName, session, outputFlowFile, process,putToAttribute,attributeSize); session.read(inputFlowFile, callback); - outputStreamFlowFile = callback.outputStreamFlowFile; + + outputFlowFile = callback.outputFlowFile; + if (putToAttribute) { + outputFlowFile = session.putAttribute(outputFlowFile, attributeName, new String(callback.outputBuffer, 0, callback.size)); + } + exitCode = callback.exitCode; logger.debug("Execution complete for command: {}. Exited with code: {}", new Object[]{executeCommand, exitCode}); @@ -321,21 +371,29 @@ public class ExecuteStreamCommand extends AbstractProcessor { int length = strBldr.length() > 4000 ? 4000 : strBldr.length(); attributes.put("execution.error", strBldr.substring(0, length)); + final Relationship outputFlowFileRelationship = putToAttribute ? ORIGINAL_RELATIONSHIP : OUTPUT_STREAM_RELATIONSHIP; if (exitCode == 0) { - logger.info("Transferring flow file {} to 'output stream'", new Object[]{outputStreamFlowFile}); + logger.info("Transferring flow file {} to {}", + new Object[]{outputFlowFile,outputFlowFileRelationship.getName()}); } else { - logger.error("Transferring flow file {} to 'output stream'. Executable command {} ended in an error: {}", - new Object[]{outputStreamFlowFile, executeCommand, strBldr.toString()}); + logger.error("Transferring flow file {} to {}. Executable command {} ended in an error: {}", + new Object[]{outputFlowFile,outputFlowFileRelationship.getName(), executeCommand, strBldr.toString()}); } attributes.put("execution.status", Integer.toString(exitCode)); attributes.put("execution.command", executeCommand); attributes.put("execution.command.args", commandArguments); - outputStreamFlowFile = session.putAllAttributes(outputStreamFlowFile, attributes); - session.transfer(outputStreamFlowFile, OUTPUT_STREAM_RELATIONSHIP); - logger.info("Transferring flow file {} to original", new Object[]{inputFlowFile}); - inputFlowFile = session.putAllAttributes(inputFlowFile, attributes); - session.transfer(inputFlowFile, ORIGINAL_RELATIONSHIP); + outputFlowFile = session.putAllAttributes(outputFlowFile, attributes); + + // This transfer will transfer the FlowFile that received the stream out put to it's destined relationship. + // In the event the stream is put to the an attribute of the original, it will be transferred here. + session.transfer(outputFlowFile, outputFlowFileRelationship); + + if (!putToAttribute) { + logger.info("Transferring flow file {} to original", new Object[]{inputFlowFile}); + inputFlowFile = session.putAllAttributes(inputFlowFile, attributes); + session.transfer(inputFlowFile, ORIGINAL_RELATIONSHIP); + } } catch (final IOException ex) { // could not close Process related streams @@ -348,59 +406,97 @@ public class ExecuteStreamCommand extends AbstractProcessor { static class ProcessStreamWriterCallback implements InputStreamCallback { final boolean ignoreStdin; - final OutputStream stdInWritable; - final InputStream stdOutReadable; + final OutputStream stdinWritable; + final InputStream stdoutReadable; final ProcessorLog logger; final ProcessSession session; final Process process; - FlowFile outputStreamFlowFile; + FlowFile outputFlowFile; int exitCode; + final boolean putToAttribute; + final int attributeSize; + final String attributeName; + + byte[] outputBuffer; + int size; - public ProcessStreamWriterCallback(boolean ignoreStdin, OutputStream stdInWritable, InputStream stdOutReadable, - ProcessorLog logger, ProcessSession session, FlowFile outputStreamFlowFile, Process process) { + public ProcessStreamWriterCallback(boolean ignoreStdin, OutputStream stdinWritable, InputStream stdoutReadable,ProcessorLog logger, String attributeName, + ProcessSession session, FlowFile outputFlowFile, Process process, boolean putToAttribute, int attributeSize) { this.ignoreStdin = ignoreStdin; - this.stdInWritable = stdInWritable; - this.stdOutReadable = stdOutReadable; + this.stdinWritable = stdinWritable; + this.stdoutReadable = stdoutReadable; this.logger = logger; this.session = session; - this.outputStreamFlowFile = outputStreamFlowFile; + this.outputFlowFile = outputFlowFile; this.process = process; + this.putToAttribute = putToAttribute; + this.attributeSize = attributeSize; + this.attributeName = attributeName; } @Override public void process(final InputStream incomingFlowFileIS) throws IOException { - outputStreamFlowFile = session.write(outputStreamFlowFile, new OutputStreamCallback() { + if (putToAttribute) { + try (SoftLimitBoundedByteArrayOutputStream softLimitBoundedBAOS = new SoftLimitBoundedByteArrayOutputStream(attributeSize)) { + readStdoutReadable(ignoreStdin, stdinWritable, logger, incomingFlowFileIS); + final long longSize = StreamUtils.copy(stdoutReadable, softLimitBoundedBAOS); + + // Because the outputstream has a cap that the copy doesn't know about, adjust + // the actual size + if (longSize > (long) attributeSize) { // Explicit cast for readability + size = attributeSize; + } else{ + size = (int) longSize; // Note: safe cast, longSize is limited by attributeSize + } + + outputBuffer = softLimitBoundedBAOS.getBuffer(); + stdoutReadable.close(); - @Override - public void process(OutputStream out) throws IOException { - - Thread writerThread = new Thread(new Runnable() { - - @Override - public void run() { - if (!ignoreStdin) { - try { - StreamUtils.copy(incomingFlowFileIS, stdInWritable); - } catch (IOException e) { - logger.error("Failed to write flow file to stdIn due to {}", new Object[]{e}, e); - } - } - // MUST close the output stream to the stdIn so that whatever is reading knows - // there is no more data - IOUtils.closeQuietly(stdInWritable); - } - }); - writerThread.setDaemon(true); - writerThread.start(); - StreamUtils.copy(stdOutReadable, out); try { exitCode = process.waitFor(); } catch (InterruptedException e) { logger.warn("Command Execution Process was interrupted", e); } } - }); + } else { + outputFlowFile = session.write(outputFlowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + + readStdoutReadable(ignoreStdin, stdinWritable, logger, incomingFlowFileIS); + StreamUtils.copy(stdoutReadable, out); + try { + exitCode = process.waitFor(); + } catch (InterruptedException e) { + logger.warn("Command Execution Process was interrupted", e); + } + } + }); + } } } + private static void readStdoutReadable(final boolean ignoreStdin, final OutputStream stdinWritable, + final ProcessorLog logger, final InputStream incomingFlowFileIS) throws IOException { + Thread writerThread = new Thread(new Runnable() { + + @Override + public void run() { + if (!ignoreStdin) { + try { + StreamUtils.copy(incomingFlowFileIS, stdinWritable); + } catch (IOException e) { + // This is unlikely to occur, and isn't handled at the moment + // Bug captured in NIFI-1194 + logger.error("Failed to write flow file to stdin due to {}", new Object[]{e}, e); + } + } + // MUST close the output stream to the stdin so that whatever is reading knows + // there is no more data. + IOUtils.closeQuietly(stdinWritable); + } + }); + writerThread.setDaemon(true); + writerThread.start(); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/1e5cc070/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SoftLimitBoundedByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SoftLimitBoundedByteArrayOutputStream.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SoftLimitBoundedByteArrayOutputStream.java new file mode 100644 index 0000000..95e9a72 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SoftLimitBoundedByteArrayOutputStream.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard.util; + +import java.io.IOException; +import java.io.OutputStream; + +public class SoftLimitBoundedByteArrayOutputStream extends OutputStream { + /* + * This Bounded Array Output Stream (BAOS) allows the user to write to the output stream up to a specified limit. + * Higher than that limit the BAOS will silently return and not put more into the buffer. It also will not throw an error. + * This effectively truncates the stream for the user to fit into a bounded array. + */ + + private final byte[] buffer; + private int limit; + private int count; + + public SoftLimitBoundedByteArrayOutputStream(int capacity) { + this(capacity, capacity); + } + + public SoftLimitBoundedByteArrayOutputStream(int capacity, int limit) { + if ((capacity < limit) || (capacity | limit) < 0) { + throw new IllegalArgumentException("Invalid capacity/limit"); + } + this.buffer = new byte[capacity]; + this.limit = limit; + this.count = 0; + } + + @Override + public void write(int b) throws IOException { + if (count >= limit) { + return; + } + buffer[count++] = (byte) b; + } + + @Override + public void write(byte b[], int off, int len) throws IOException { + if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) + || ((off + len) < 0)) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return; + } + + if (count + len > limit) { + len = limit-count; + if(len == 0){ + return; + } + } + + System.arraycopy(b, off, buffer, count, len); + count += len; + } + + public void reset(int newlim) { + if (newlim > buffer.length) { + throw new IndexOutOfBoundsException("Limit exceeds buffer size"); + } + this.limit = newlim; + this.count = 0; + } + + public void reset() { + this.limit = buffer.length; + this.count = 0; + } + + public int getLimit() { + return limit; + } + + public byte[] getBuffer() { + return buffer; + } + + public int size() { + return count; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1e5cc070/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java index 0f13ba2..44576d4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java @@ -38,9 +38,6 @@ import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -/** - * - */ public class TestExecuteStreamCommand { @BeforeClass public static void init() { @@ -232,6 +229,237 @@ public class TestExecuteStreamCommand { } @Test + public void testSmallEchoPutToAttribute() throws Exception { + File dummy = new File("src/test/resources/hello.txt"); + assertTrue(dummy.exists()); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.setValidateExpressionUsage(false); + controller.enqueue("".getBytes()); + + if(isWindows()) { + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "cmd.exe"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "/c;echo Hello"); + controller.setProperty(ExecuteStreamCommand.ARG_DELIMITER, ";"); + } else{ + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "echo"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "Hello"); + } + controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true"); + controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output"); + + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0); + + List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); + MockFlowFile outputFlowFile = flowFiles.get(0); + outputFlowFile.assertContentEquals(""); + String ouput = outputFlowFile.getAttribute("executeStreamCommand.output"); + assertTrue(ouput.startsWith("Hello")); + assertEquals("0", outputFlowFile.getAttribute("execution.status")); + assertEquals(isWindows() ? "cmd.exe" : "echo", outputFlowFile.getAttribute("execution.command")); + } + + @Test + public void testExecuteJarPutToAttribute() throws Exception { + File exJar = new File("src/test/resources/ExecuteCommand/TestSuccess.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.setValidateExpressionUsage(false); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath); + controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output"); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0); + + List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); + MockFlowFile outputFlowFile = flowFiles.get(0); + String result = outputFlowFile.getAttribute("executeStreamCommand.output"); + outputFlowFile.assertContentEquals(dummy); + assertTrue(Pattern.compile("Test was a success\r?\n").matcher(result).find()); + assertEquals("0", outputFlowFile.getAttribute("execution.status")); + assertEquals("java", outputFlowFile.getAttribute("execution.command")); + assertEquals("-jar;", outputFlowFile.getAttribute("execution.command.args").substring(0, 5)); + String attribute = outputFlowFile.getAttribute("execution.command.args"); + String expected = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "ExecuteCommand" + File.separator + "TestSuccess.jar"; + assertEquals(expected, attribute.substring(attribute.length() - expected.length())); + } + + @Test + public void testExecuteJarToAttributeConfiguration() throws Exception { + File exJar = new File("src/test/resources/ExecuteCommand/TestSuccess.jar"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.setValidateExpressionUsage(false); + controller.enqueue("small test".getBytes()); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath); + controller.setProperty(ExecuteStreamCommand.PUT_ATTRIBUTE_MAX_LENGTH, "10"); + controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "outputDest"); + assertEquals(1, controller.getProcessContext().getAvailableRelationships().size()); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0); + + List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); + MockFlowFile outputFlowFile = flowFiles.get(0); + outputFlowFile.assertContentEquals("small test".getBytes()); + String result = outputFlowFile.getAttribute("outputDest"); + assertTrue(Pattern.compile("Test was a").matcher(result).find()); + assertEquals("0", outputFlowFile.getAttribute("execution.status")); + assertEquals("java", outputFlowFile.getAttribute("execution.command")); + assertEquals("-jar;", outputFlowFile.getAttribute("execution.command.args").substring(0, 5)); + String attribute = outputFlowFile.getAttribute("execution.command.args"); + String expected = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "ExecuteCommand" + File.separator + "TestSuccess.jar"; + assertEquals(expected, attribute.substring(attribute.length() - expected.length())); + } + + @Test + public void testExecuteIngestAndUpdatePutToAttribute() throws IOException { + File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + File dummy100MBytes = new File("target/100MB.txt"); + FileInputStream fis = new FileInputStream(dummy); + FileOutputStream fos = new FileOutputStream(dummy100MBytes); + byte[] bytes = new byte[1024]; + assertEquals(1000, fis.read(bytes)); + fis.close(); + for (int i = 0; i < 100000; i++) { + fos.write(bytes, 0, 1000); + } + fos.close(); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.setValidateExpressionUsage(false); + controller.enqueue(dummy100MBytes.toPath()); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath); + controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "outputDest"); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0); + List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); + String result = flowFiles.get(0).getAttribute("outputDest"); + + assertTrue(Pattern.compile("nifi-standard-processors:ModifiedResult\r?\n").matcher(result).find()); + } + + @Test + public void testLargePutToAttribute() throws IOException { + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + File dummy100MBytes = new File("target/100MB.txt"); + FileInputStream fis = new FileInputStream(dummy); + FileOutputStream fos = new FileOutputStream(dummy100MBytes); + byte[] bytes = new byte[1024]; + assertEquals(1000, fis.read(bytes)); + fis.close(); + for (int i = 0; i < 100000; i++) { + fos.write(bytes, 0, 1000); + } + fos.close(); + + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.setValidateExpressionUsage(false); + controller.enqueue("".getBytes()); + if(isWindows()) { + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "cmd.exe"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "/c;type " + dummy100MBytes.getAbsolutePath()); + controller.setProperty(ExecuteStreamCommand.ARG_DELIMITER, ";"); + } else{ + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "cat"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, dummy100MBytes.getAbsolutePath()); + } + controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true"); + controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output"); + controller.setProperty(ExecuteStreamCommand.PUT_ATTRIBUTE_MAX_LENGTH, "256"); + + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0); + List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); + + flowFiles.get(0).assertAttributeEquals("execution.status", "0"); + String result = flowFiles.get(0).getAttribute("executeStreamCommand.output"); + assertTrue(Pattern.compile("a{256}").matcher(result).matches()); + } + + @Test + public void testExecuteIngestAndUpdateWithWorkingDirPutToAttribute() throws IOException { + File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.setValidateExpressionUsage(false); + + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "streamOutput"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); + String result = flowFiles.get(0).getAttribute("streamOutput"); + + final String quotedSeparator = Pattern.quote(File.separator); + assertTrue(Pattern.compile(quotedSeparator + "nifi-standard-processors" + quotedSeparator + "target:ModifiedResult\r?\n").matcher(result).find()); + } + + @Test + public void testIgnoredStdinPutToAttribute() throws IOException { + File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.setValidateExpressionUsage(false); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath); + controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true"); + controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output"); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); + String result = flowFiles.get(0).getAttribute("executeStreamCommand.output"); + assertTrue("TestIngestAndUpdate.jar should not have received anything to modify", + Pattern.compile("target:ModifiedResult\r?\n?").matcher(result).find()); + } + + @Test + public void testDynamicEnvironmentPutToAttribute() throws Exception { + File exJar = new File("src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.setProperty("NIFI_TEST_1", "testvalue1"); + controller.setProperty("NIFI_TEST_2", "testvalue2"); + controller.setValidateExpressionUsage(false); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath); + controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output"); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); + String result = flowFiles.get(0).getAttribute("executeStreamCommand.output"); + Set<String> dynamicEnvironmentVariables = new HashSet<>(Arrays.asList(result.split("\r?\n"))); + assertFalse("Should contain at least two environment variables starting with NIFI", dynamicEnvironmentVariables.size() < 2); + assertTrue("NIFI_TEST_1 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_1=testvalue1")); + assertTrue("NIFI_TEST_2 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_2=testvalue2")); + } + + @Test public void testQuotedArguments() throws Exception { List<String> args = ArgumentUtils.splitArgs("echo -n \"arg1 arg2 arg3\"", ' '); assertEquals(3, args.size()); @@ -250,4 +478,7 @@ public class TestExecuteStreamCommand { controller.assertValid(); } + private static boolean isWindows() { + return System.getProperty("os.name").toLowerCase().startsWith("windows"); + } }
