NIFI-5024 Resolves deadlock in ExecuteStreamCommand processor

This closes #2594

Signed-off-by: Mike Thomsen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/498fd8f2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/498fd8f2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/498fd8f2

Branch: refs/heads/master
Commit: 498fd8f22dbe6ef322d77eb13d37ab8d49d69461
Parents: f69b720
Author: Nicolas Sanglard <[email protected]>
Authored: Thu Mar 29 07:59:03 2018 +0200
Committer: Mike Thomsen <[email protected]>
Committed: Thu May 31 06:35:39 2018 -0400

----------------------------------------------------------------------
 .../nifi-standard-processors/pom.xml            |   1 +
 .../standard/ExecuteStreamCommand.java          |  47 +++++----
 .../src/test/java/TestLogStdErr.java            |  30 ++++++
 .../standard/TestExecuteStreamCommand.java      |  20 ++++
 .../src/test/resources/ExecuteCommand/1mb.txt   | 101 +++++++++++++++++++
 .../resources/ExecuteCommand/TestLogStdErr.jar  | Bin 0 -> 900 bytes
 6 files changed, 179 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/498fd8f2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 6f1a77d..4d328d0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -439,6 +439,7 @@
                         
<exclude>src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar</exclude>
                         
<exclude>src/test/resources/ExecuteCommand/TestSuccess.jar</exclude>
                         
<exclude>src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar</exclude>
+                        
<exclude>src/test/resources/ExecuteCommand/TestLogStdErr.jar</exclude>
                         
<exclude>src/test/resources/TestIdentifyMimeType/1.jar</exclude>
                         
<exclude>src/test/resources/TestIdentifyMimeType/1.tar</exclude>
                         
<exclude>src/test/resources/TestIdentifyMimeType/1.tar.gz</exclude>

http://git-wip-us.apache.org/repos/asf/nifi/blob/498fd8f2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
index 6f23ca8..94db1c0 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
@@ -18,11 +18,10 @@ package org.apache.nifi.processors.standard;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.lang.ProcessBuilder.Redirect;
 import java.util.ArrayList;
@@ -137,10 +136,10 @@ import org.apache.nifi.stream.io.StreamUtils;
 @CapabilityDescription("Executes an external command on the contents of a flow 
file, and creates a new flow file with the results of the command.")
 @DynamicProperty(name = "An environment variable name", value = "An 
environment variable value", description = "These environment variables are 
passed to the process spawned by this Processor")
 @WritesAttributes({
-    @WritesAttribute(attribute = "execution.command", description = "The name 
of the command executed"),
-    @WritesAttribute(attribute = "execution.command.args", description = "The 
semi-colon delimited list of arguments"),
-    @WritesAttribute(attribute = "execution.status", description = "The exit 
status code returned from executing the command"),
-    @WritesAttribute(attribute = "execution.error", description = "Any error 
messages returned from executing the command")})
+        @WritesAttribute(attribute = "execution.command", description = "The 
name of the command executed"),
+        @WritesAttribute(attribute = "execution.command.args", description = 
"The semi-colon delimited list of arguments"),
+        @WritesAttribute(attribute = "execution.status", description = "The 
exit status code returned from executing the command"),
+        @WritesAttribute(attribute = "execution.error", description = "Any 
error messages returned from executing the command")})
 @Restricted(
         restrictions = {
                 @Restriction(
@@ -186,7 +185,7 @@ public class ExecuteStreamCommand extends AbstractProcessor 
{
                 @Override
                 public ValidationResult validate(String subject, String input, 
ValidationContext context) {
                     ValidationResult result = new ValidationResult.Builder()
-                    .subject(subject).valid(true).input(input).build();
+                            .subject(subject).valid(true).input(input).build();
                     List<String> args = ArgumentUtils.splitArgs(input, 
context.getProperty(ARG_DELIMITER).getValue().charAt(0));
                     for (String arg : args) {
                         ValidationResult valResult = 
ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR.validate(subject, arg, context);
@@ -300,11 +299,11 @@ public class ExecuteStreamCommand extends 
AbstractProcessor {
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
         return new PropertyDescriptor.Builder()
-        .name(propertyDescriptorName)
-        .description("Sets the environment variable '" + 
propertyDescriptorName + "' for the process' environment")
-        .dynamic(true)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .build();
+                .name(propertyDescriptorName)
+                .description("Sets the environment variable '" + 
propertyDescriptorName + "' for the process' environment")
+                .dynamic(true)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .build();
     }
 
     @Override
@@ -351,6 +350,15 @@ public class ExecuteStreamCommand extends 
AbstractProcessor {
         builder.directory(dir);
         builder.redirectInput(Redirect.PIPE);
         builder.redirectOutput(Redirect.PIPE);
+        final File errorOut;
+        try {
+            errorOut = File.createTempFile("out", null);
+            builder.redirectError(errorOut);
+        } catch (IOException e) {
+            logger.error("Could not create temporary file for error logging", 
e);
+            throw new ProcessException(e);
+        }
+
         final Process process;
         try {
             process = builder.start();
@@ -360,9 +368,7 @@ public class ExecuteStreamCommand extends AbstractProcessor 
{
         }
         try (final OutputStream pos = process.getOutputStream();
              final InputStream pis = process.getInputStream();
-             final InputStream pes = process.getErrorStream();
-             final BufferedInputStream bis = new BufferedInputStream(pis);
-             final BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(pes))) {
+             final BufferedInputStream bis = new BufferedInputStream(pis)) {
             int exitCode = -1;
             final BufferedOutputStream bos = new BufferedOutputStream(pos);
             FlowFile outputFlowFile = putToAttribute ? inputFlowFile : 
session.create(inputFlowFile);
@@ -382,10 +388,10 @@ public class ExecuteStreamCommand extends 
AbstractProcessor {
             Map<String, String> attributes = new HashMap<>();
 
             final StringBuilder strBldr = new StringBuilder();
-            try {
-                String line;
-                while ((line = bufferedReader.readLine()) != null) {
-                    strBldr.append(line).append("\n");
+            try (final InputStream is = new FileInputStream(errorOut)) {
+                int c;
+                while ((c = is.read()) != -1) {
+                    strBldr.append((char) c);
                 }
             } catch (IOException e) {
                 strBldr.append("Unknown...could not read Process's Std Error");
@@ -424,6 +430,7 @@ public class ExecuteStreamCommand extends AbstractProcessor 
{
             // could not close Process related streams
             logger.warn("Problem terminating Process {}", new 
Object[]{process}, ex);
         } finally {
+            errorOut.delete();
             process.destroy(); // last ditch effort to clean up that process.
         }
     }
@@ -524,4 +531,4 @@ public class ExecuteStreamCommand extends AbstractProcessor 
{
         writerThread.setDaemon(true);
         writerThread.start();
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/498fd8f2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestLogStdErr.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestLogStdErr.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestLogStdErr.java
new file mode 100644
index 0000000..a6c6a37
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestLogStdErr.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+
+public class TestLogStdErr {
+
+    public static void main(String[] args) throws IOException, 
URISyntaxException {
+        char[] chars = new char[1024 * 1024];
+        Arrays.fill(chars, 'f');
+        System.err.println(new String(chars));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/498fd8f2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
index cdeec44..08282cd 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
@@ -129,6 +129,26 @@ public class TestExecuteStreamCommand {
     }
 
     @Test
+    public void testLoggingToStdErr() throws IOException {
+        File exJar = new 
File("src/test/resources/ExecuteCommand/TestLogStdErr.jar");
+        File dummy = new File("src/test/resources/ExecuteCommand/1mb.txt");
+        String jarPath = exJar.getAbsolutePath();
+        exJar.setExecutable(true);
+        final TestRunner controller = 
TestRunners.newTestRunner(ExecuteStreamCommand.class);
+        controller.setValidateExpressionUsage(false);
+        controller.enqueue(dummy.toPath());
+        controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+        controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, 
"-jar;" + jarPath);
+        controller.run(1);
+        
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+        
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 
1);
+        List<MockFlowFile> flowFiles = 
controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
+        MockFlowFile flowFile = flowFiles.get(0);
+        assertEquals(0, flowFile.getSize());
+        assertEquals("fffffffffffffffffffffffffffffff", 
flowFile.getAttribute("execution.error").substring(0, 31));
+    }
+
+    @Test
     public void testExecuteIngestAndUpdateWithWorkingDir() throws IOException {
         File exJar = new 
File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
         File dummy = new 
File("src/test/resources/ExecuteCommand/1000bytes.txt");

Reply via email to