Repository: samza
Updated Branches:
  refs/heads/master c99e57133 -> 13ff09024


SAMZA-1630: Log a thread dump on timeouts

Would be useful to get a thread dump on timeouts, e.g. for AsyncStreamTask 
callback timeout, container shutdown timeout, heartbeat monitor graceful 
shutdown timeout etc.

Author: Prateek Maheshwari <[email protected]>
Author: Prateek Maheshwari <[email protected]>

Reviewers: Jacob Maes <[email protected]>

Closes #460 from prateekm/thread-dump-on-timeout


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/13ff0902
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/13ff0902
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/13ff0902

Branch: refs/heads/master
Commit: 13ff090245b3539ee5c048c707dcee2e5bde5252
Parents: c99e571
Author: Prateek Maheshwari <[email protected]>
Authored: Wed Mar 28 18:14:34 2018 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Wed Mar 28 18:14:34 2018 -0700

----------------------------------------------------------------------
 .../container/ContainerHeartbeatMonitor.java    |  3 +++
 .../SamzaContainerExceptionHandler.java         |  8 ++++++--
 .../apache/samza/task/TaskCallbackManager.java  |  2 ++
 .../apache/samza/container/SamzaContainer.scala |  3 ++-
 .../main/scala/org/apache/samza/util/Util.scala | 20 ++++++++++++++++++--
 5 files changed, 31 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/13ff0902/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
 
b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
index 64e7450..ba21141 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +57,7 @@ public class ContainerHeartbeatMonitor {
           scheduler.schedule(() -> {
               // On timeout of container shutting down, force exit.
               LOG.error("Graceful shutdown timeout expired. Force exiting.");
+              Util.printThreadDump("Thread dump at heartbeat monitor shutdown 
timeout.");
               System.exit(1);
             }, SHUTDOWN_TIMOUT_MS, TimeUnit.MILLISECONDS);
           onContainerExpired.run();

http://git-wip-us.apache.org/repos/asf/samza/blob/13ff0902/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java
 
b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java
index 0381794..2a01cd7 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java
@@ -20,6 +20,8 @@
 package org.apache.samza.container;
 
 import java.lang.Thread.UncaughtExceptionHandler;
+
+import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,10 +47,12 @@ public class SamzaContainerExceptionHandler implements 
UncaughtExceptionHandler
    */
   @Override
   public void uncaughtException(Thread t, Throwable e) {
-    LOGGER.error(
-        String.format("Uncaught exception in thread (name=%s). Exiting process 
now.", t.getName()), e);
+    String msg = String.format("Uncaught exception in thread %s.", 
t.getName());
+    LOGGER.error(msg, e);
+    System.err.println(msg);
     e.printStackTrace(System.err);
     try {
+      Util.printThreadDump("Thread dump from uncaught exception handler.");
       runnable.run();
     } catch (Throwable throwable) {
       // Ignore to avoid further exception propagation

http://git-wip-us.apache.org/repos/asf/samza/blob/13ff0902/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
index c773368..036706f 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
@@ -32,6 +32,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.util.HighResolutionClock;
+import org.apache.samza.util.Util;
 
 
 /**
@@ -100,6 +101,7 @@ class TaskCallbackManager {
       Runnable timerTask = new Runnable() {
         @Override
         public void run() {
+          Util.printThreadDump("Thread dump at task callback timeout");
           String msg = "Callback for task {} " + callback.taskName + " timed 
out after " + timeout + " ms.";
           callback.failure(new SamzaException(msg));
         }

http://git-wip-us.apache.org/repos/asf/samza/blob/13ff0902/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 7cc1924..a5b172d 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -940,7 +940,7 @@ class SamzaContainer(
 
   def addShutdownHook {
     val runLoopThread = Thread.currentThread()
-    shutdownHookThread = new Thread("CONTAINER-SHUTDOWN-HOOK") {
+    shutdownHookThread = new Thread("Samza Container Shutdown Hook Thread") {
       override def run() = {
         info("Shutting down, will wait up to %s ms." format shutdownMs)
         shutdownRunLoop()  //TODO: Pull out shutdown hook to 
LocalContainerRunner or SP
@@ -954,6 +954,7 @@ class SamzaContainer(
           info("Shutdown complete")
         } else {
           error("Did not shut down within %s ms, exiting." format shutdownMs)
+          Util.printThreadDump("Thread dump from Samza Container Shutdown 
Hook.")
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/13ff0902/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala 
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index ea23760..a50ffeb 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -37,7 +37,8 @@ import scala.collection.immutable.Map
 
 
 object Util extends Logging {
-  val random = new Random
+  val Random = new Random
+  val ThreadMxBean = ManagementFactory.getThreadMXBean
 
   def clock: Long = System.currentTimeMillis
   /**
@@ -49,7 +50,7 @@ object Util extends Logging {
    * Get a random number >= startInclusive, and < endExclusive.
    */
   def randomBetween(startInclusive: Int, endExclusive: Int) =
-    startInclusive + random.nextInt(endExclusive - startInclusive)
+    startInclusive + Random.nextInt(endExclusive - startInclusive)
 
   /**
    * Recursively remove a directory (or file), and all sub-directories. 
Equivalent
@@ -431,4 +432,19 @@ object Util extends Logging {
       case _ => config
     }
   }
+
+  def printThreadDump(message: String): Unit = {
+    try {
+      val threadInfo = ThreadMxBean.dumpAllThreads(true, true)
+      val sb = new StringBuilder
+      sb.append(message).append("\n")
+      for (ti <- threadInfo) {
+        sb.append(ti.toString).append("\n")
+      }
+      System.out.println(sb)
+    } catch {
+      case e: Exception =>
+        System.out.println("Could not get and log a thread dump", e)
+    }
+  }
 }

Reply via email to