Repository: incubator-nifi Updated Branches: refs/heads/nifi-27 d5b20d463 -> 732cb7071
NIFI-145: Added PID detection for Linux Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d517b3fd Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d517b3fd Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d517b3fd Branch: refs/heads/nifi-27 Commit: d517b3fd0932676edddfa2cfbacb320b6142e882 Parents: 2bcd1e6 Author: Mark Payne <[email protected]> Authored: Thu Dec 11 13:40:56 2014 -0500 Committer: Mark Payne <[email protected]> Committed: Thu Dec 11 13:40:56 2014 -0500 ---------------------------------------------------------------------- .../resources/src/main/resources/bin/nifi.sh | 2 +- .../src/main/resources/conf/bootstrap.conf | 8 +- .../apache/nifi/bootstrap/BootstrapCodec.java | 1 + .../java/org/apache/nifi/bootstrap/RunNiFi.java | 439 ++++++++++++++++--- 4 files changed, 387 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d517b3fd/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh index 60afa48..9dc7ef6 100644 --- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh +++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh @@ -172,7 +172,7 @@ run() { echo "Bootstrap Config File: $BOOTSTRAP_CONF" echo - exec "$JAVA" -cp "$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $1 + exec "$JAVA" -cp "$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $1 $2 } main() { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d517b3fd/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf b/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf index c45d8f8..6b32b2b 100644 --- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf +++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf @@ -13,4 +13,10 @@ java.arg.2=-Xms256m java.arg.3=-Xmx512m # Enable Remote Debugging -#java.arg.2=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000 \ No newline at end of file +#java.arg.2=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000 + +# Java command to use when running NiFi +java=java + +# Username to use when running NiFi. This value will be ignored on Windows. +run.as= http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d517b3fd/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java ---------------------------------------------------------------------- diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java index 8138c02..fb10930 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java @@ -27,6 +27,7 @@ import java.util.Arrays; import org.apache.nifi.bootstrap.exception.InvalidCommandException; + public class BootstrapCodec { private final RunNiFi runner; private final BufferedReader reader; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d517b3fd/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java ---------------------------------------------------------------------- diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java index 1b82a3c..a5987bc 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java @@ -26,10 +26,11 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; +import java.io.Reader; +import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -39,6 +40,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.ConsoleHandler; +import java.util.logging.Handler; +import java.util.logging.Level; /** @@ -60,6 +64,8 @@ public class RunNiFi { public static final String GRACEFUL_SHUTDOWN_PROP = "graceful.shutdown.seconds"; public static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20"; + public static final String RUN_AS_PROP = "run.as"; + public static final int MAX_RESTART_ATTEMPTS = 5; public static final int STARTUP_WAIT_SECONDS = 60; @@ -68,20 +74,32 @@ public class RunNiFi { private volatile boolean autoRestartNiFi = true; private volatile int ccPort = -1; + private volatile long nifiPid = -1L; private final Lock lock = new ReentrantLock(); private final Condition startupCondition = lock.newCondition(); private final File bootstrapConfigFile; + + private final java.util.logging.Logger logger; - public RunNiFi(final File bootstrapConfigFile) { + public RunNiFi(final File bootstrapConfigFile, final boolean verbose) { this.bootstrapConfigFile = bootstrapConfigFile; + logger = java.util.logging.Logger.getLogger("Bootstrap"); + if ( verbose ) { + logger.info("Enabling Verbose Output"); + + logger.setLevel(Level.FINE); + final Handler handler = new ConsoleHandler(); + handler.setLevel(Level.FINE); + logger.addHandler(handler); + } } private static void printUsage() { System.out.println("Usage:"); System.out.println(); - System.out.println("java org.apache.nifi.bootstrap.RunNiFi <command>"); + System.out.println("java org.apache.nifi.bootstrap.RunNiFi [<-verbose>] <command>"); System.out.println(); System.out.println("Valid commands include:"); System.out.println(""); @@ -91,22 +109,33 @@ public class RunNiFi { System.out.println("Run : Start a new instance of Apache NiFi and monitor the Process, restarting if the instance dies"); System.out.println(); } + public static void main(final String[] args) throws IOException, InterruptedException { - if ( args.length != 1 ) { + if ( args.length < 1 || args.length > 2 ) { printUsage(); return; } - switch (args[0].toLowerCase()) { + boolean verbose = false; + if ( args.length == 2 ) { + if ( args[0].equals("-verbose") ) { + verbose = true; + } else { + printUsage(); + return; + } + } + + final String cmd = args.length == 1 ? args[0] : args[1]; + + switch (cmd.toLowerCase()) { case "start": case "run": case "stop": case "status": break; default: - System.out.println("Invalid argument: " + args[0]); - System.out.println(); printUsage(); return; } @@ -128,9 +157,9 @@ public class RunNiFi { final File configFile = new File(configFilename); - final RunNiFi runNiFi = new RunNiFi(configFile); + final RunNiFi runNiFi = new RunNiFi(configFile, verbose); - switch (args[0].toLowerCase()) { + switch (cmd.toLowerCase()) { case "start": runNiFi.start(false); break; @@ -151,49 +180,189 @@ public class RunNiFi { final File confDir = bootstrapConfigFile.getParentFile(); final File nifiHome = confDir.getParentFile(); final File bin = new File(nifiHome, "bin"); - final File statusFile = new File(bin, "nifi.port"); + final File statusFile = new File(bin, "nifi.pid"); + + logger.fine("Status File: " + statusFile); + return statusFile; } + + private Properties loadProperties() throws IOException { + final Properties props = new Properties(); + final File statusFile = getStatusFile(); + if ( statusFile == null || !statusFile.exists() ) { + logger.fine("No status file to load properties from"); + return props; + } + + try (final FileInputStream fis = new FileInputStream(getStatusFile())) { + props.load(fis); + } + + logger.fine("Properties: " + props); + return props; + } + + private synchronized void saveProperties(final Properties nifiProps) throws IOException { + final File statusFile = getStatusFile(); + try (final FileOutputStream fos = new FileOutputStream(statusFile)) { + nifiProps.store(fos, null); + fos.getFD().sync(); + } + + logger.fine("Saved Properties " + nifiProps + " to " + statusFile); + } + + private boolean isPingSuccessful(final int port) { + logger.fine("Pinging " + port); + + try (final Socket socket = new Socket("localhost", port)) { + final OutputStream out = socket.getOutputStream(); + out.write((PING_CMD + "\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + logger.fine("Sent PING command"); + + final InputStream in = socket.getInputStream(); + final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + final String response = reader.readLine(); + logger.fine("PING response: " + response); + + return PING_CMD.equals(response); + } catch (final IOException ioe) { + return false; + } + } + private Integer getCurrentPort() throws IOException { - try { - final File statusFile = getStatusFile(); - final byte[] info = Files.readAllBytes(statusFile.toPath()); - final String text = new String(info); - - final int port = Integer.parseInt(text); - - try (final Socket socket = new Socket("localhost", port)) { - final OutputStream out = socket.getOutputStream(); - out.write((PING_CMD + "\n").getBytes(StandardCharsets.UTF_8)); - out.flush(); - - final InputStream in = socket.getInputStream(); - final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - final String response = reader.readLine(); - if ( response.equals(PING_CMD) ) { - return port; - } - } catch (final IOException ioe) { - System.out.println("Found NiFi instance info at " + statusFile + " indicating that NiFi is running and listening to port " + port + " but unable to communicate with NiFi on that port. The process may have died or may be hung."); - throw ioe; - } - } catch (final Exception e) { - return null; + final Properties props = loadProperties(); + final String portVal = props.getProperty("port"); + if ( portVal == null ) { + logger.fine("No Port found in status file"); + return null; + } else { + logger.fine("Port defined in status file: " + portVal); } - return null; + final int port = Integer.parseInt(portVal); + final boolean success = isPingSuccessful(port); + if ( success ) { + logger.fine("Successful PING on port " + port); + return port; + } + + final String pid = props.getProperty("pid"); + logger.fine("PID in status file is " + pid); + if ( pid != null ) { + final boolean procRunning = isProcessRunning(pid); + if ( procRunning ) { + return port; + } else { + return null; + } + } + + return null; } + private boolean isProcessRunning(final String pid) { + try { + // We use the "ps" command to check if the process is still running. + final ProcessBuilder builder = new ProcessBuilder(); + + // ps -p <pid> -o comm= + // -> -p <pid> to filter just the pid we care about + // -> -o comm= to remove headers from the output + builder.command("ps", "-p", pid, "-o", "comm="); + final Process proc = builder.start(); + + // Read how many lines are output by the 'ps' command + int lineCount = 0; + String line; + try (final InputStream in = proc.getInputStream(); + final Reader streamReader = new InputStreamReader(in); + final BufferedReader reader = new BufferedReader(streamReader)) { + + while ((line = reader.readLine()) != null) { + if ( !line.trim().isEmpty() ) { + lineCount++; + } + } + } + + // If anything was output, the process is running. + final boolean running = lineCount > 0; + if ( running ) { + logger.fine("Process with PID " + pid + " is running"); + } else { + logger.fine("Process with PID " + pid + " is not running"); + } + + return running; + } catch (final IOException ioe) { + System.err.println("Failed to determine if Process " + pid + " is running; assuming that it is not"); + return false; + } + } + + + private Status getStatus() { + final Properties props; + try { + props = loadProperties(); + } catch (final IOException ioe) { + return new Status(null, null, false, false); + } + + if ( props == null ) { + return new Status(null, null, false, false); + } + + final String portValue = props.getProperty("port"); + final String pid = props.getProperty("pid"); + + if ( portValue == null && pid == null ) { + return new Status(null, null, false, false); + } + + Integer port = null; + boolean pingSuccess = false; + if ( portValue != null ) { + try { + port = Integer.parseInt(portValue); + pingSuccess = isPingSuccessful(port); + } catch (final NumberFormatException nfe) { + return new Status(null, null, false, false); + } + } + + if ( pingSuccess ) { + return new Status(port, pid, true, true); + } + + final boolean alive = (pid == null) ? false : isProcessRunning(pid); + return new Status(port, pid, pingSuccess, alive); + } + public void status() throws IOException { - final Integer port = getCurrentPort(); - if ( port == null ) { - System.out.println("Apache NiFi does not appear to be running"); - } else { - System.out.println("Apache NiFi is currently running, listening on port " + port); - } - return; + final Status status = getStatus(); + if ( status.isRespondingToPing() ) { + logger.info("Apache NiFi is currently running, listening to Bootstrap on port " + status.getPort() + + ", PID=" + (status.getPid() == null ? "unknkown" : status.getPid())); + return; + } + + if ( status.isProcessRunning() ) { + logger.info("Apache NiFi is running at PID " + status.getPid() + " but is not responding to ping requests"); + return; + } + + if ( status.getPid() == null ) { + logger.info("Apache NiFi is not responding to Ping requests. The process may have died or may be hung"); + } else { + logger.info("Apache NiFi is not running"); + } } @@ -209,6 +378,7 @@ public class RunNiFi { socket.connect(new InetSocketAddress("localhost", port)); socket.setSoTimeout(60000); + logger.fine("Sending SHUTDOWN Command to port " + port); final OutputStream out = socket.getOutputStream(); out.write((SHUTDOWN_CMD + "\n").getBytes(StandardCharsets.UTF_8)); out.flush(); @@ -216,23 +386,72 @@ public class RunNiFi { final InputStream in = socket.getInputStream(); final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); final String response = reader.readLine(); + + logger.fine("Received response to SHUTDOWN command: " + response); + if ( SHUTDOWN_CMD.equals(response) ) { - System.out.println("Apache NiFi has accepted the Shutdown Command and is shutting down now"); + logger.info("Apache NiFi has accepted the Shutdown Command and is shutting down now"); + + final Properties nifiProps = loadProperties(); + final String pid = nifiProps.getProperty("pid"); + if ( pid != null ) { + + final Properties bootstrapProperties = new Properties(); + try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) { + bootstrapProperties.load(fis); + } + + String gracefulShutdown = bootstrapProperties.getProperty(GRACEFUL_SHUTDOWN_PROP, DEFAULT_GRACEFUL_SHUTDOWN_VALUE); + int gracefulShutdownSeconds; + try { + gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown); + } catch (final NumberFormatException nfe) { + gracefulShutdownSeconds = Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE); + } + + final long startWait = System.nanoTime(); + while ( isProcessRunning(pid) ) { + logger.info("Waiting for Apache NiFi to finish shutting down..."); + final long waitNanos = System.nanoTime() - startWait; + final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos); + if ( waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0 ) { + if ( isProcessRunning(pid) ) { + logger.warning("NiFi has not finished shutting down after " + gracefulShutdownSeconds + " seconds. Killing process."); + try { + killProcess(pid); + } catch (final IOException ioe) { + logger.severe("Failed to kill Process with PID " + pid); + } + } + break; + } else { + try { + Thread.sleep(2000L); + } catch (final InterruptedException ie) {} + } + } + + logger.info("NiFi has finished shutting down."); + } final File statusFile = getStatusFile(); if ( !statusFile.delete() ) { - System.err.println("Failed to delete status file " + statusFile + "; this file should be cleaned up manually"); + logger.severe("Failed to delete status file " + statusFile + "; this file should be cleaned up manually"); } } else { - System.err.println("When sending SHUTDOWN command to NiFi, got unexpected response " + response); + logger.severe("When sending SHUTDOWN command to NiFi, got unexpected response " + response); } } catch (final IOException ioe) { - System.err.println("Failed to communicate with Apache NiFi"); + logger.severe("Failed to send shutdown command to port " + port + " due to " + ioe); return; } } + private static void killProcess(final String pid) throws IOException { + Runtime.getRuntime().exec(new String[] {"kill", "-9", pid}); + } + public static boolean isAlive(final Process process) { try { process.exitValue(); @@ -246,7 +465,7 @@ public class RunNiFi { public void start(final boolean monitor) throws IOException, InterruptedException { final Integer port = getCurrentPort(); if ( port != null ) { - System.out.println("Apache NiFi is already running, listening on port " + port); + System.out.println("Apache NiFi is already running, listening to Bootstrap on port " + port); return; } @@ -344,7 +563,20 @@ public class RunNiFi { final NiFiListener listener = new NiFiListener(); final int listenPort = listener.start(this); + String runAs = isWindows() ? null : props.get(RUN_AS_PROP); + if ( runAs != null ) { + runAs = runAs.trim(); + if ( runAs.isEmpty() ) { + runAs = null; + } + } + final List<String> cmd = new ArrayList<>(); + if ( runAs != null ) { + cmd.add("sudo"); + cmd.add("-u"); + cmd.add(runAs); + } cmd.add(javaCmd); cmd.add("-classpath"); cmd.add(classPath); @@ -361,9 +593,9 @@ public class RunNiFi { cmdBuilder.append(s).append(" "); } - System.out.println("Starting Apache NiFi..."); - System.out.println("Working Directory: " + workingDir.getAbsolutePath()); - System.out.println("Command: " + cmdBuilder.toString()); + logger.info("Starting Apache NiFi..."); + logger.info("Working Directory: " + workingDir.getAbsolutePath()); + logger.info("Command: " + cmdBuilder.toString()); if ( monitor ) { String gracefulShutdown = props.get(GRACEFUL_SHUTDOWN_PROP); @@ -383,6 +615,13 @@ public class RunNiFi { } Process process = builder.start(); + Long pid = getPid(process); + if ( pid != null ) { + nifiPid = pid; + final Properties nifiProps = new Properties(); + nifiProps.setProperty("pid", String.valueOf(nifiPid)); + saveProperties(properties); + } ShutdownHook shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds); final Runtime runtime = Runtime.getRuntime(); @@ -404,18 +643,26 @@ public class RunNiFi { } if (autoRestartNiFi) { - System.out.println("Apache NiFi appears to have died. Restarting..."); + logger.warning("Apache NiFi appears to have died. Restarting..."); process = builder.start(); + pid = getPid(process); + if ( pid != null ) { + nifiPid = pid; + final Properties nifiProps = new Properties(); + nifiProps.setProperty("pid", String.valueOf(nifiPid)); + saveProperties(properties); + } + shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds); runtime.addShutdownHook(shutdownHook); final boolean started = waitForStart(); if ( started ) { - System.out.println("Successfully started Apache NiFi"); + logger.info("Successfully started Apache NiFi" + (pid == null ? "" : " with PID " + pid)); } else { - System.err.println("Apache NiFi does not appear to have started"); + logger.severe("Apache NiFi does not appear to have started"); } } else { return; @@ -423,13 +670,22 @@ public class RunNiFi { } } } else { - builder.start(); + final Process process = builder.start(); + final Long pid = getPid(process); + + if ( pid != null ) { + nifiPid = pid; + final Properties nifiProps = new Properties(); + nifiProps.setProperty("pid", String.valueOf(nifiPid)); + saveProperties(properties); + } + boolean started = waitForStart(); if ( started ) { - System.out.println("Successfully started Apache NiFi"); + logger.info("Successfully started Apache NiFi" + (pid == null ? "" : " with PID " + pid)); } else { - System.err.println("Apache NiFi does not appear to have started"); + logger.severe("Apache NiFi does not appear to have started"); } listener.stop(); @@ -437,6 +693,30 @@ public class RunNiFi { } + private Long getPid(final Process process) { + try { + final Class<?> procClass = process.getClass(); + final Field pidField = procClass.getDeclaredField("pid"); + pidField.setAccessible(true); + final Object pidObject = pidField.get(process); + + logger.fine("PID Object = " + pidObject); + + if ( pidObject instanceof Number ) { + return ((Number) pidObject).longValue(); + } + return null; + } catch (final IllegalAccessException | NoSuchFieldException nsfe) { + logger.fine("Could not find PID for child process due to " + nsfe); + return null; + } + } + + private boolean isWindows() { + final String osName = System.getProperty("os.name"); + return osName != null && osName.toLowerCase().contains("win"); + } + private boolean waitForStart() { lock.lock(); try { @@ -482,17 +762,54 @@ public class RunNiFi { this.ccPort = port; final File statusFile = getStatusFile(); - try (final FileOutputStream fos = new FileOutputStream(statusFile)) { - fos.write(String.valueOf(port).getBytes(StandardCharsets.UTF_8)); - fos.getFD().sync(); + + final Properties nifiProps = new Properties(); + if ( nifiPid != -1 ) { + nifiProps.setProperty("pid", String.valueOf(nifiPid)); + } + nifiProps.setProperty("port", String.valueOf(ccPort)); + try { + saveProperties(nifiProps); } catch (final IOException ioe) { - System.err.println("Apache NiFi has started but failed to persist NiFi Port information to " + statusFile.getAbsolutePath() + " due to " + ioe); + logger.warning("Apache NiFi has started but failed to persist NiFi Port information to " + statusFile.getAbsolutePath() + " due to " + ioe); } - System.out.println("Apache NiFi now running and listening for requests on port " + port); + logger.info("Apache NiFi now running and listening for Bootstrap requests on port " + port); } int getNiFiCommandControlPort() { return this.ccPort; } + + + private static class Status { + private final Integer port; + private final String pid; + + private final Boolean respondingToPing; + private final Boolean processRunning; + + public Status(final Integer port, final String pid, final Boolean respondingToPing, final Boolean processRunning) { + this.port = port; + this.pid = pid; + this.respondingToPing = respondingToPing; + this.processRunning = processRunning; + } + + public String getPid() { + return pid; + } + + public Integer getPort() { + return port; + } + + public boolean isRespondingToPing() { + return Boolean.TRUE.equals(respondingToPing); + } + + public boolean isProcessRunning() { + return Boolean.TRUE.equals(processRunning); + } + } }
