Repository: nifi
Updated Branches:
  refs/heads/0.x 8e190512f -> 7e8dbcdb9


NIFI-1905 enabled ExecuteProcess to terminate process

NIFI-1905 polishing

NIFI-1905 changed WARN to INFO during shutdown
This closes #456


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7e8dbcdb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7e8dbcdb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7e8dbcdb

Branch: refs/heads/0.x
Commit: 7e8dbcdb9e91881b0af98d151f2e0488fd124bda
Parents: 8e19051
Author: Oleg Zhurakousky <[email protected]>
Authored: Fri May 20 09:05:44 2016 -0400
Committer: Oleg Zhurakousky <[email protected]>
Committed: Sun May 22 10:37:15 2016 -0400

----------------------------------------------------------------------
 .../processors/standard/ExecuteProcess.java     | 32 ++++++++++++++------
 .../processors/standard/TestExecuteProcess.java | 31 +++++++++++++++++++
 2 files changed, 53 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7e8dbcdb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
index fd6bb05..74d05cd 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
@@ -132,6 +132,8 @@ public class ExecuteProcess extends AbstractProcessor {
     .description("All created FlowFiles are routed to this relationship")
     .build();
 
+    private volatile Process externalProcess;
+
     private volatile ExecutorService executor;
     private Future<?> longRunningProcess;
     private AtomicBoolean failure = new AtomicBoolean(false);
@@ -181,7 +183,14 @@ public class ExecuteProcess extends AbstractProcessor {
 
     @OnUnscheduled
     public void shutdownExecutor() {
-        executor.shutdown();
+        try {
+            executor.shutdown();
+        } finally {
+            if (this.externalProcess.isAlive()) {
+                this.getLogger().info("Process hasn't terminated, forcing the 
interrupt");
+                this.externalProcess.destroyForcibly();
+            }
+        }
     }
 
     @Override
@@ -299,14 +308,14 @@ public class ExecuteProcess extends AbstractProcessor {
         }
 
         getLogger().info("Start creating new Process > {} ", new Object[] { 
commandStrings });
-        final Process newProcess = 
builder.redirectErrorStream(redirectErrorStream).start();
+        this.externalProcess = 
builder.redirectErrorStream(redirectErrorStream).start();
 
         // Submit task to read error stream from process
         if (!redirectErrorStream) {
             executor.submit(new Runnable() {
                 @Override
                 public void run() {
-                    try (final BufferedReader reader = new BufferedReader(new 
InputStreamReader(newProcess.getErrorStream()))) {
+                    try (final BufferedReader reader = new BufferedReader(new 
InputStreamReader(externalProcess.getErrorStream()))) {
                         while (reader.read() >= 0) {
                         }
                     } catch (final IOException ioe) {
@@ -324,7 +333,7 @@ public class ExecuteProcess extends AbstractProcessor {
                     if (batchNanos == null) {
                         // if we aren't batching, just copy the stream from the
                         // process to the flowfile.
-                        try (final BufferedInputStream bufferedIn = new 
BufferedInputStream(newProcess.getInputStream())) {
+                        try (final BufferedInputStream bufferedIn = new 
BufferedInputStream(externalProcess.getInputStream())) {
                             final byte[] buffer = new byte[4096];
                             int len;
                             while ((len = bufferedIn.read(buffer)) > 0) {
@@ -351,7 +360,7 @@ public class ExecuteProcess extends AbstractProcessor {
                         // Also, we don't want that text to get split up in the
                         // middle of a line, so we use BufferedReader
                         // to read lines of text and write them as lines of 
text.
-                        try (final BufferedReader reader = new 
BufferedReader(new InputStreamReader(newProcess.getInputStream()))) {
+                        try (final BufferedReader reader = new 
BufferedReader(new InputStreamReader(externalProcess.getInputStream()))) {
                             String line;
 
                             while ((line = reader.readLine()) != null) {
@@ -367,13 +376,15 @@ public class ExecuteProcess extends AbstractProcessor {
                     failure.set(true);
                     throw ioe;
                 } finally {
-                    int exitCode;
                     try {
-                        exitCode = newProcess.exitValue();
-                    } catch (final Exception e) {
-                        exitCode = -99999;
+                        // Since we are going to exit anyway, one sec gives it 
an extra chance to exit gracefully.
+                        // In the future consider exposing it via 
configuration.
+                        boolean terminated = externalProcess.waitFor(1000, 
TimeUnit.MILLISECONDS);
+                        int exitCode = terminated ? 
externalProcess.exitValue() : -9999;
+                        getLogger().info("Process finished with exit code {} 
", new Object[] { exitCode });
+                    } catch (InterruptedException e1) {
+                        Thread.currentThread().interrupt();
                     }
-                    getLogger().info("Process finished with exit code {} ", 
new Object[] { exitCode });
                 }
 
                 return null;
@@ -412,6 +423,7 @@ public class ExecuteProcess extends AbstractProcessor {
             try {
                 Thread.sleep(millis);
             } catch (final InterruptedException ie) {
+                Thread.currentThread().interrupt();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7e8dbcdb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java
index c91d24d..160bbdb 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java
@@ -18,11 +18,15 @@ package org.apache.nifi.processors.standard;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.File;
+import java.lang.reflect.Field;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.standard.util.ArgumentUtils;
@@ -97,6 +101,33 @@ public class TestExecuteProcess {
         }
     }
 
+    @Test
+    public void validateProcessInterruptOnStop() throws Exception {
+        final TestRunner runner = 
TestRunners.newTestRunner(ExecuteProcess.class);
+        runner.setProperty(ExecuteProcess.COMMAND, "ping");
+        runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, 
"nifi.apache.org");
+        runner.setProperty(ExecuteProcess.BATCH_DURATION, "500 millis");
+
+        runner.run();
+        Thread.sleep(500);
+        ExecuteProcess processor = (ExecuteProcess) runner.getProcessor();
+        try {
+            Field executorF = 
ExecuteProcess.class.getDeclaredField("executor");
+            executorF.setAccessible(true);
+            ExecutorService executor = (ExecutorService) 
executorF.get(processor);
+            assertTrue(executor.isShutdown());
+            assertTrue(executor.isTerminated());
+
+            Field processF = 
ExecuteProcess.class.getDeclaredField("externalProcess");
+            processF.setAccessible(true);
+            Process process = (Process) processF.get(processor);
+            assertFalse(process.isAlive());
+        } catch (Exception e) {
+            fail();
+        }
+
+    }
+
     // @Test
     public void testBigBinaryInputData() {
         System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", 
"TRACE");

Reply via email to