NIFI-5024 Resolves deadlock in ExecuteStreamCommand processor This closes #2594
Signed-off-by: Mike Thomsen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/498fd8f2 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/498fd8f2 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/498fd8f2 Branch: refs/heads/master Commit: 498fd8f22dbe6ef322d77eb13d37ab8d49d69461 Parents: f69b720 Author: Nicolas Sanglard <[email protected]> Authored: Thu Mar 29 07:59:03 2018 +0200 Committer: Mike Thomsen <[email protected]> Committed: Thu May 31 06:35:39 2018 -0400 ---------------------------------------------------------------------- .../nifi-standard-processors/pom.xml | 1 + .../standard/ExecuteStreamCommand.java | 47 +++++---- .../src/test/java/TestLogStdErr.java | 30 ++++++ .../standard/TestExecuteStreamCommand.java | 20 ++++ .../src/test/resources/ExecuteCommand/1mb.txt | 101 +++++++++++++++++++ .../resources/ExecuteCommand/TestLogStdErr.jar | Bin 0 -> 900 bytes 6 files changed, 179 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/498fd8f2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 6f1a77d..4d328d0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -439,6 +439,7 @@ <exclude>src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar</exclude> <exclude>src/test/resources/ExecuteCommand/TestSuccess.jar</exclude> <exclude>src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar</exclude> + <exclude>src/test/resources/ExecuteCommand/TestLogStdErr.jar</exclude> <exclude>src/test/resources/TestIdentifyMimeType/1.jar</exclude> <exclude>src/test/resources/TestIdentifyMimeType/1.tar</exclude> <exclude>src/test/resources/TestIdentifyMimeType/1.tar.gz</exclude> http://git-wip-us.apache.org/repos/asf/nifi/blob/498fd8f2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java index 6f23ca8..94db1c0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java @@ -18,11 +18,10 @@ package org.apache.nifi.processors.standard; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; -import java.io.BufferedReader; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; import java.io.OutputStream; import java.lang.ProcessBuilder.Redirect; import java.util.ArrayList; @@ -137,10 +136,10 @@ import org.apache.nifi.stream.io.StreamUtils; @CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.") @DynamicProperty(name = "An environment variable name", value = "An environment variable value", description = "These environment variables are passed to the process spawned by this Processor") @WritesAttributes({ - @WritesAttribute(attribute = "execution.command", description = "The name of the command executed"), - @WritesAttribute(attribute = "execution.command.args", description = "The semi-colon delimited list of arguments"), - @WritesAttribute(attribute = "execution.status", description = "The exit status code returned from executing the command"), - @WritesAttribute(attribute = "execution.error", description = "Any error messages returned from executing the command")}) + @WritesAttribute(attribute = "execution.command", description = "The name of the command executed"), + @WritesAttribute(attribute = "execution.command.args", description = "The semi-colon delimited list of arguments"), + @WritesAttribute(attribute = "execution.status", description = "The exit status code returned from executing the command"), + @WritesAttribute(attribute = "execution.error", description = "Any error messages returned from executing the command")}) @Restricted( restrictions = { @Restriction( @@ -186,7 +185,7 @@ public class ExecuteStreamCommand extends AbstractProcessor { @Override public ValidationResult validate(String subject, String input, ValidationContext context) { ValidationResult result = new ValidationResult.Builder() - .subject(subject).valid(true).input(input).build(); + .subject(subject).valid(true).input(input).build(); List<String> args = ArgumentUtils.splitArgs(input, context.getProperty(ARG_DELIMITER).getValue().charAt(0)); for (String arg : args) { ValidationResult valResult = ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR.validate(subject, arg, context); @@ -300,11 +299,11 @@ public class ExecuteStreamCommand extends AbstractProcessor { @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment") - .dynamic(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name(propertyDescriptorName) + .description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment") + .dynamic(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); } @Override @@ -351,6 +350,15 @@ public class ExecuteStreamCommand extends AbstractProcessor { builder.directory(dir); builder.redirectInput(Redirect.PIPE); builder.redirectOutput(Redirect.PIPE); + final File errorOut; + try { + errorOut = File.createTempFile("out", null); + builder.redirectError(errorOut); + } catch (IOException e) { + logger.error("Could not create temporary file for error logging", e); + throw new ProcessException(e); + } + final Process process; try { process = builder.start(); @@ -360,9 +368,7 @@ public class ExecuteStreamCommand extends AbstractProcessor { } try (final OutputStream pos = process.getOutputStream(); final InputStream pis = process.getInputStream(); - final InputStream pes = process.getErrorStream(); - final BufferedInputStream bis = new BufferedInputStream(pis); - final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes))) { + final BufferedInputStream bis = new BufferedInputStream(pis)) { int exitCode = -1; final BufferedOutputStream bos = new BufferedOutputStream(pos); FlowFile outputFlowFile = putToAttribute ? inputFlowFile : session.create(inputFlowFile); @@ -382,10 +388,10 @@ public class ExecuteStreamCommand extends AbstractProcessor { Map<String, String> attributes = new HashMap<>(); final StringBuilder strBldr = new StringBuilder(); - try { - String line; - while ((line = bufferedReader.readLine()) != null) { - strBldr.append(line).append("\n"); + try (final InputStream is = new FileInputStream(errorOut)) { + int c; + while ((c = is.read()) != -1) { + strBldr.append((char) c); } } catch (IOException e) { strBldr.append("Unknown...could not read Process's Std Error"); @@ -424,6 +430,7 @@ public class ExecuteStreamCommand extends AbstractProcessor { // could not close Process related streams logger.warn("Problem terminating Process {}", new Object[]{process}, ex); } finally { + errorOut.delete(); process.destroy(); // last ditch effort to clean up that process. } } @@ -524,4 +531,4 @@ public class ExecuteStreamCommand extends AbstractProcessor { writerThread.setDaemon(true); writerThread.start(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/498fd8f2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestLogStdErr.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestLogStdErr.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestLogStdErr.java new file mode 100644 index 0000000..a6c6a37 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestLogStdErr.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Arrays; + +public class TestLogStdErr { + + public static void main(String[] args) throws IOException, URISyntaxException { + char[] chars = new char[1024 * 1024]; + Arrays.fill(chars, 'f'); + System.err.println(new String(chars)); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/498fd8f2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java index cdeec44..08282cd 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java @@ -129,6 +129,26 @@ public class TestExecuteStreamCommand { } @Test + public void testLoggingToStdErr() throws IOException { + File exJar = new File("src/test/resources/ExecuteCommand/TestLogStdErr.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1mb.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.setValidateExpressionUsage(false); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1); + List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP); + MockFlowFile flowFile = flowFiles.get(0); + assertEquals(0, flowFile.getSize()); + assertEquals("fffffffffffffffffffffffffffffff", flowFile.getAttribute("execution.error").substring(0, 31)); + } + + @Test public void testExecuteIngestAndUpdateWithWorkingDir() throws IOException { File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar"); File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
