NIFI-262, NIFI-263: Added 'restart' and 'dump' options to nifi.sh script

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/7737fbd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/7737fbd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/7737fbd8

Branch: refs/heads/NIFI-250
Commit: 7737fbd84d955520e2f6e23bbbfa8a5ebe43b3ed
Parents: c62aba1
Author: Mark Payne <[email protected]>
Authored: Wed Jan 14 12:24:09 2015 -0500
Committer: Mark Payne <[email protected]>
Committed: Wed Jan 14 12:24:09 2015 -0500

----------------------------------------------------------------------
 .../src/main/resources/bin/dump-nifi.bat        |  33 +++++
 .../resources/src/main/resources/bin/nifi.sh    |   4 +-
 .../java/org/apache/nifi/BootstrapListener.java | 122 ++++++++++++++++++-
 .../java/org/apache/nifi/bootstrap/RunNiFi.java |  89 ++++++++++++--
 4 files changed, 234 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7737fbd8/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/dump-nifi.bat
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/dump-nifi.bat
 
b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/dump-nifi.bat
new file mode 100644
index 0000000..71e5a1a
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/dump-nifi.bat
@@ -0,0 +1,33 @@
+@echo off
+rem
+rem    Licensed to the Apache Software Foundation (ASF) under one or more
+rem    contributor license agreements.  See the NOTICE file distributed with
+rem    this work for additional information regarding copyright ownership.
+rem    The ASF licenses this file to You under the Apache License, Version 2.0
+rem    (the "License"); you may not use this file except in compliance with
+rem    the License.  You may obtain a copy of the License at
+rem
+rem       http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem    Unless required by applicable law or agreed to in writing, software
+rem    distributed under the License is distributed on an "AS IS" BASIS,
+rem    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem    See the License for the specific language governing permissions and
+rem    limitations under the License.
+rem
+
+rem Use JAVA_HOME if it's set; otherwise, just use java
+IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET 
JAVA_EXE=%JAVA_HOME%\bin\java.exe)
+
+SET NIFI_ROOT=%~dp0..\
+CD /d "%NIFI_ROOT%"
+SET LIB_DIR=lib\bootstrap
+SET CONF_DIR=conf
+
+SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
+SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
+
+SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% 
org.apache.nifi.bootstrap.RunNiFi
+SET BOOTSTRAP_ACTION=dump
+
+cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7737fbd8/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh
 
b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh
index 163f8e2..fb0d22e 100644
--- 
a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh
+++ 
b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh
@@ -172,10 +172,10 @@ case "$1" in
     install)
         install "$@"
         ;;
-    start|stop|run|status)
+    start|stop|run|restart|status|dump)
         main "$@"
         ;;
     *)
-        echo "Usage nifi {start|stop|run|status|install}"
+        echo "Usage nifi {start|stop|run|restart|status|dump|install}"
         ;;
 esac

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7737fbd8/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
 
b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
index 3393952..590797c 100644
--- 
a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
+++ 
b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
@@ -17,16 +17,27 @@
 package org.apache.nifi;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.lang.management.LockInfo;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -162,6 +173,10 @@ public class BootstrapListener {
                                                                                
echoShutdown(socket.getOutputStream());
                                                                                
nifi.shutdownHook();
                                                                                
return;
+                                                                       case 
DUMP:
+                                                                           
logger.info("Received DUMP request from Bootstrap");
+                                                                           
writeDump(socket.getOutputStream());
+                                                                           
break;
                                                                }
                                                        } catch (final 
Throwable t) {
                                                                
logger.error("Failed to process request from Bootstrap due to " + t.toString(), 
t);
@@ -182,6 +197,110 @@ public class BootstrapListener {
        }
        
        
+       private static void writeDump(final OutputStream out) throws 
IOException {
+        final ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
+        final BufferedWriter writer = new BufferedWriter(new 
OutputStreamWriter(out));
+        
+        final ThreadInfo[] infos = mbean.dumpAllThreads(true, true);
+        final long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
+        final long[] monitorDeadlockThreadIds = 
mbean.findMonitorDeadlockedThreads();
+        
+        final List<ThreadInfo> sortedInfos = new ArrayList<>(infos.length);
+        for ( final ThreadInfo info : infos ) {
+            sortedInfos.add(info);
+        }
+        Collections.sort(sortedInfos, new Comparator<ThreadInfo>() {
+            @Override
+            public int compare(ThreadInfo o1, ThreadInfo o2) {
+                return 
o1.getThreadName().toLowerCase().compareTo(o2.getThreadName().toLowerCase());
+            }
+        });
+        
+        final StringBuilder sb = new StringBuilder();
+        for ( final ThreadInfo info : sortedInfos ) {
+            sb.append("\n");
+            sb.append("\"").append(info.getThreadName()).append("\" Id=");
+            sb.append(info.getThreadId()).append(" ");
+            sb.append(info.getThreadState().toString()).append(" ");
+            
+            switch (info.getThreadState()) {
+                case BLOCKED:
+                case TIMED_WAITING:
+                case WAITING:
+                    sb.append(" on ");
+                    sb.append(info.getLockInfo());
+                    break;
+                default:
+                    break;
+            }
+            
+            if (info.isSuspended()) {
+                sb.append(" (suspended)");
+            }
+            if ( info.isInNative() ) {
+                sb.append(" (in native code)");
+            }
+            
+            if ( deadlockedThreadIds != null && deadlockedThreadIds.length > 0 
) {
+                for ( final long id : deadlockedThreadIds ) {
+                    if ( id == info.getThreadId() ) {
+                        sb.append(" ** DEADLOCKED THREAD **");
+                    }
+                }
+            }
+
+           if ( monitorDeadlockThreadIds != null && 
monitorDeadlockThreadIds.length > 0 ) {
+                for ( final long id : monitorDeadlockThreadIds ) {
+                    if ( id == info.getThreadId() ) {
+                        sb.append(" ** MONITOR-DEADLOCKED THREAD **");
+                    }
+                }
+            }
+
+            final StackTraceElement[] stackTraces = info.getStackTrace();
+            for ( final StackTraceElement element : stackTraces ) {
+                sb.append("\n\tat ").append(element);
+                
+                final MonitorInfo[] monitors = info.getLockedMonitors();
+                for ( final MonitorInfo monitor : monitors ) {
+                    if ( monitor.getLockedStackFrame().equals(element) ) {
+                        sb.append("\n\t- waiting on ").append(monitor);
+                    }
+                }
+            }
+            
+            final LockInfo[] lockInfos = info.getLockedSynchronizers();
+            if ( lockInfos.length > 0 ) {
+                sb.append("\n\t");
+                sb.append("Number of Locked Synchronizers: 
").append(lockInfos.length);
+                for ( final LockInfo lockInfo : lockInfos ) {
+                    sb.append("\n\t- ").append(lockInfo.toString());
+                }
+            }
+            
+            sb.append("\n");
+        }
+        
+        if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) {
+            sb.append("\n\nDEADLOCK DETECTED!");
+            sb.append("\nThe following thread IDs are deadlocked:");
+            for ( final long id : deadlockedThreadIds ) {
+                sb.append("\n").append(id);
+            }
+        }
+
+       if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length 
> 0) {
+            sb.append("\n\nMONITOR DEADLOCK DETECTED!");
+            sb.append("\nThe following thread IDs are deadlocked:");
+            for ( final long id : monitorDeadlockThreadIds ) {
+                sb.append("\n").append(id);
+            }
+        }
+
+        writer.write(sb.toString());
+        writer.flush();
+    }
+       
        private void echoPing(final OutputStream out) throws IOException {
                out.write("PING\n".getBytes(StandardCharsets.UTF_8));
                out.flush();
@@ -205,7 +324,7 @@ public class BootstrapListener {
                
                final String line = reader.readLine();
                final String[] splits = line.split(" ");
-               if ( splits.length < 0 ) {
+               if ( splits.length < 1 ) {
                        throw new IOException("Received invalid request from 
Bootstrap: " + line);
                }
                
@@ -235,6 +354,7 @@ public class BootstrapListener {
        private static class BootstrapRequest {
                public static enum RequestType {
                        SHUTDOWN,
+                       DUMP,
                        PING;
                }
                

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7737fbd8/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
----------------------------------------------------------------------
diff --git 
a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
index e8f6439..f920860 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
@@ -34,6 +34,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.attribute.PosixFilePermission;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -75,6 +76,7 @@ public class RunNiFi {
        
        public static final String SHUTDOWN_CMD = "SHUTDOWN";
        public static final String PING_CMD = "PING";
+       public static final String DUMP_CMD = "DUMP";
        
        private volatile boolean autoRestartNiFi = true;
        private volatile int ccPort = -1;
@@ -105,41 +107,52 @@ public class RunNiFi {
        private static void printUsage() {
                System.out.println("Usage:");
                System.out.println();
-               System.out.println("java org.apache.nifi.bootstrap.RunNiFi 
[<-verbose>] <command>");
+               System.out.println("java org.apache.nifi.bootstrap.RunNiFi 
[<-verbose>] <command> [options]");
                System.out.println();
                System.out.println("Valid commands include:");
                System.out.println("");
                System.out.println("Start : Start a new instance of Apache 
NiFi");
                System.out.println("Stop : Stop a running instance of Apache 
NiFi");
+               System.out.println("Restart : Stop Apache NiFi, if it is 
running, and then start a new instance");
                System.out.println("Status : Determine if there is a running 
instance of Apache NiFi");
+               System.out.println("Dump : Write a Thread Dump to the file 
specified by [options], or to the log if no file is given");
                System.out.println("Run : Start a new instance of Apache NiFi 
and monitor the Process, restarting if the instance dies");
                System.out.println();
        }
 
+       private static String[] shift(final String[] orig) {
+           return Arrays.copyOfRange(orig, 1, orig.length);
+       }
        
-       public static void main(final String[] args) throws IOException, 
InterruptedException {
-               if ( args.length < 1 || args.length > 2 ) {
+       public static void main(String[] args) throws IOException, 
InterruptedException {
+               if ( args.length < 1 || args.length > 3 ) {
                        printUsage();
                        return;
                }
                
+               File dumpFile = null;
                boolean verbose = false;
-               if ( args.length == 2 ) {
-                   if ( args[0].equals("-verbose") ) {
-                       verbose = true;
-                   } else {
-                       printUsage();
-                       return;
-                   }
+               if ( args[0].equals("-verbose") ) {
+                   verbose = true;
+                   args = shift(args);
                }
                
-               final String cmd = args.length == 1 ? args[0] : args[1];
+               final String cmd = args[0];
+           if (cmd.equals("dump") ) {
+               if ( args.length > 1 ) {
+                   dumpFile = new File(args[1]);
+               } else {
+                   dumpFile = null;
+               }
+           }
                
                switch (cmd.toLowerCase()) {
                        case "start":
                        case "run":
                        case "stop":
                        case "status":
+                       case "dump":
+                       case "restart":
                                break;
                        default:
                                printUsage();
@@ -178,6 +191,13 @@ public class RunNiFi {
                        case "status":
                                runNiFi.status();
                                break;
+                       case "restart":
+                           runNiFi.stop();
+                           runNiFi.start(false);
+                           break;
+                       case "dump":
+                           runNiFi.dump(dumpFile);
+                           break;
                }
        }
        
@@ -391,6 +411,53 @@ public class RunNiFi {
        }
        
        
+       /**
+        * Writes a NiFi thread dump to the given file; if file is null, logs 
at INFO level instead.
+        * @param dumpFile
+        * @return
+        * @throws IOException
+        */
+       public void dump(final File dumpFile) throws IOException {
+           final Integer port = getCurrentPort();
+        if ( port == null ) {
+            System.out.println("Apache NiFi is not currently running");
+        }
+        
+        final Properties nifiProps = loadProperties();
+        final String secretKey = nifiProps.getProperty("secret.key");
+
+        final StringBuilder sb = new StringBuilder();
+           try (final Socket socket = new Socket()) {
+            logger.fine("Connecting to NiFi instance");
+            socket.setSoTimeout(60000);
+            socket.connect(new InetSocketAddress("localhost", port));
+            logger.fine("Established connection to NiFi instance.");
+            socket.setSoTimeout(60000);
+            
+            logger.fine("Sending DUMP Command to port " + port);
+            final OutputStream out = socket.getOutputStream();
+            out.write((DUMP_CMD + " " + secretKey + 
"\n").getBytes(StandardCharsets.UTF_8));
+            out.flush();
+            
+            final InputStream in = socket.getInputStream();
+            final BufferedReader reader = new BufferedReader(new 
InputStreamReader(in));
+            String line;
+            while ((line = reader.readLine()) != null ) {
+                sb.append(line).append("\n");
+            }
+        }
+           
+           final String dump = sb.toString();
+           if ( dumpFile == null ) {
+               logger.info(dump);
+           } else {
+               try (final FileOutputStream fos = new 
FileOutputStream(dumpFile)) {
+                   fos.write(dump.getBytes(StandardCharsets.UTF_8));
+               }
+               logger.info("Successfully wrote thread dump to " + 
dumpFile.getAbsolutePath());
+           }
+       }
+       
        public void stop() throws IOException {
                final Integer port = getCurrentPort();
                if ( port == null ) {

Reply via email to