Repository: incubator-nifi Updated Branches: refs/heads/develop 87b07384a -> 6f1c12f5e
NIFI-187: Fixed bug that prevents nifi from shutting down properly when RemoteProcessGroup is present on graph Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a2fd2636 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a2fd2636 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a2fd2636 Branch: refs/heads/develop Commit: a2fd2636d011855759846e3ef99172ef905a3d33 Parents: d850768 Author: Mark Payne <[email protected]> Authored: Fri Dec 19 16:03:08 2014 -0500 Committer: Mark Payne <[email protected]> Committed: Fri Dec 19 16:03:08 2014 -0500 ---------------------------------------------------------------------- .../org/apache/nifi/groups/RemoteProcessGroup.java | 2 ++ .../org/apache/nifi/groups/StandardProcessGroup.java | 4 ++++ .../apache/nifi/remote/StandardRemoteProcessGroup.java | 5 +++++ .../main/java/org/apache/nifi/bootstrap/RunNiFi.java | 13 ++++++++++--- .../java/org/apache/nifi/bootstrap/ShutdownHook.java | 11 +++++++++-- 5 files changed, 30 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a2fd2636/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java index 3acd1d3..e0cca64 100644 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java @@ -47,6 +47,8 @@ public interface RemoteProcessGroup { void setComments(String comments); + void shutdown(); + /** * Returns the name of this RemoteProcessGroup. The value returned will * never be null. If unable to communicate with the remote instance, the URI http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a2fd2636/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 1064536..8cff5dd 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -333,6 +333,10 @@ public final class StandardProcessGroup implements ProcessGroup { } } + for ( final RemoteProcessGroup rpg : procGroup.getRemoteProcessGroups() ) { + rpg.shutdown(); + } + // Recursively shutdown child groups. for (final ProcessGroup group : procGroup.getProcessGroups()) { shutdown(group); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a2fd2636/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index d3fb41f..b2f541c 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -285,6 +285,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } @Override + public void shutdown() { + backgroundThreadExecutor.shutdown(); + } + + @Override public String getIdentifier() { return id; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a2fd2636/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java ---------------------------------------------------------------------- diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java index 437493e..0f97f2d 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java @@ -34,7 +34,6 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.attribute.PosixFilePermission; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -80,6 +79,8 @@ public class RunNiFi { private volatile boolean autoRestartNiFi = true; private volatile int ccPort = -1; private volatile long nifiPid = -1L; + private volatile String secretKey; + private volatile ShutdownHook shutdownHook; private final Lock lock = new ReentrantLock(); private final Condition startupCondition = lock.newCondition(); @@ -675,7 +676,7 @@ public class RunNiFi { saveProperties(nifiProps); } - ShutdownHook shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds); + shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds); final Runtime runtime = Runtime.getRuntime(); runtime.addShutdownHook(shutdownHook); @@ -706,7 +707,7 @@ public class RunNiFi { saveProperties(nifiProps); } - shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds); + shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds); runtime.addShutdownHook(shutdownHook); final boolean started = waitForStart(); @@ -812,6 +813,12 @@ public class RunNiFi { void setNiFiCommandControlPort(final int port, final String secretKey) { this.ccPort = port; + this.secretKey = secretKey; + + if ( shutdownHook != null ) { + shutdownHook.setSecretKey(secretKey); + } + final File statusFile = getStatusFile(); final Properties nifiProps = new Properties(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a2fd2636/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java ---------------------------------------------------------------------- diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java index 3c5ed1f..3d3a241 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java @@ -28,12 +28,19 @@ public class ShutdownHook extends Thread { private final RunNiFi runner; private final int gracefulShutdownSeconds; - public ShutdownHook(final Process nifiProcess, final RunNiFi runner, final int gracefulShutdownSeconds) { + private volatile String secretKey; + + public ShutdownHook(final Process nifiProcess, final RunNiFi runner, final String secretKey, final int gracefulShutdownSeconds) { this.nifiProcess = nifiProcess; this.runner = runner; + this.secretKey = secretKey; this.gracefulShutdownSeconds = gracefulShutdownSeconds; } + void setSecretKey(final String secretKey) { + this.secretKey = secretKey; + } + @Override public void run() { runner.setAutoRestartNiFi(false); @@ -44,7 +51,7 @@ public class ShutdownHook extends Thread { try { final Socket socket = new Socket("localhost", ccPort); final OutputStream out = socket.getOutputStream(); - out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8)); + out.write(("SHUTDOWN " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); out.flush(); socket.close();
