Repository: nifi Updated Branches: refs/heads/master 58aedd785 -> 310347fd6
NIFI-842 Updating org.apache.nifi.NiFi so it sends a started signal to bootstrap which can then be used to make retsart decisions Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3bfe830a Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3bfe830a Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3bfe830a Branch: refs/heads/master Commit: 3bfe830afc363d70925cecca4472ae34da6795bb Parents: c7f7704 Author: Bryan Bende <[email protected]> Authored: Sun Aug 23 08:02:52 2015 -0400 Committer: Bryan Bende <[email protected]> Committed: Mon Aug 24 12:00:21 2015 -0400 ---------------------------------------------------------------------- .../apache/nifi/bootstrap/BootstrapCodec.java | 16 ++++++++++ .../java/org/apache/nifi/bootstrap/RunNiFi.java | 30 +++++++++++++++++- .../java/org/apache/nifi/BootstrapListener.java | 32 +++++++++++++++----- .../src/main/java/org/apache/nifi/NiFi.java | 4 +++ 4 files changed, 74 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/3bfe830a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java ---------------------------------------------------------------------- diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java index 3817875..d925fa3 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java @@ -87,6 +87,22 @@ public class BootstrapCodec { writer.flush(); } break; + case "STARTED": { + if (args.length != 1) { + throw new InvalidCommandException("STARTED command must contain a status argument"); + } + + if (!"true".equals(args[0]) && !"false".equals(args[0])) { + throw new InvalidCommandException("Invalid status for STARTED command; should be true or false, but was '" + args[0] + "'"); + } + + final boolean started = Boolean.parseBoolean(args[0]); + runner.setNiFiStarted(started); + writer.write("OK"); + writer.newLine(); + writer.flush(); + } + break; } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/3bfe830a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java ---------------------------------------------------------------------- diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java index c2f4feb..636fd4a 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java @@ -92,7 +92,9 @@ public class RunNiFi { private volatile long nifiPid = -1L; private volatile String secretKey; private volatile ShutdownHook shutdownHook; + private volatile boolean nifiStarted; + private final Lock startedLock = new ReentrantLock(); private final Lock lock = new ReentrantLock(); private final Condition startupCondition = lock.newCondition(); @@ -799,10 +801,18 @@ public class RunNiFi { if (autoRestartNiFi) { final File statusFile = getStatusFile(defaultLogger); if (!statusFile.exists()) { - defaultLogger.debug("Status File no longer exists. Will not restart NiFi"); + defaultLogger.info("Status File no longer exists. Will not restart NiFi"); return; } + final boolean previouslyStarted = getNifiStarted(); + if (!previouslyStarted) { + defaultLogger.info("NiFi never started. Will not restart NiFi"); + return; + } else { + setNiFiStarted(false); + } + defaultLogger.warn("Apache NiFi appears to have died. Restarting..."); process = builder.start(); handleLogging(process); @@ -973,6 +983,24 @@ public class RunNiFi { return this.ccPort; } + void setNiFiStarted(final boolean nifiStarted) { + startedLock.lock(); + try { + this.nifiStarted = nifiStarted; + } finally { + startedLock.unlock(); + } + } + + boolean getNifiStarted() { + startedLock.lock(); + try { + return nifiStarted; + } finally { + startedLock.unlock(); + } + } + private static class Status { private final Integer port; http://git-wip-us.apache.org/repos/asf/nifi/blob/3bfe830a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java index c1bdf97..373212a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java @@ -80,13 +80,37 @@ public class BootstrapListener { listenThread.start(); logger.debug("Notifying Bootstrap that local port is {}", localPort); + sendCommand("PORT", new String[] { String.valueOf(localPort), secretKey}); + } + + public void stop() { + if (listener != null) { + listener.stop(); + } + } + + public void sendStartedStatus(boolean status) throws IOException { + logger.debug("Notifying Bootstrap that the status of starting NiFi is {}", status); + sendCommand("STARTED", new String[]{ String.valueOf(status) }); + } + + private void sendCommand(final String command, final String[] args) throws IOException { try (final Socket socket = new Socket()) { socket.setSoTimeout(60000); socket.connect(new InetSocketAddress("localhost", bootstrapPort)); socket.setSoTimeout(60000); + final StringBuilder commandBuilder = new StringBuilder(command); + for (final String arg : args) { + commandBuilder.append(" ").append(arg); + } + commandBuilder.append("\n"); + + final String commandWithArgs = commandBuilder.toString(); + logger.debug("Sending command to Bootstrap: " + commandWithArgs); + final OutputStream out = socket.getOutputStream(); - out.write(("PORT " + localPort + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); + out.write((commandWithArgs).getBytes(StandardCharsets.UTF_8)); out.flush(); logger.debug("Awaiting response from Bootstrap..."); @@ -100,12 +124,6 @@ public class BootstrapListener { } } - public void stop() { - if (listener != null) { - listener.stop(); - } - } - private class Listener implements Runnable { private final ServerSocket serverSocket; http://git-wip-us.apache.org/repos/asf/nifi/blob/3bfe830a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java index ef2377f..ae4cf40 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java @@ -136,6 +136,10 @@ public class NiFi { } else { nifiServer.start(); + if (bootstrapListener != null) { + bootstrapListener.sendStartedStatus(true); + } + final long endTime = System.nanoTime(); logger.info("Controller initialization took " + (endTime - startTime) + " nanoseconds."); }
