Repository: samza Updated Branches: refs/heads/master c99e57133 -> 13ff09024
SAMZA-1630: Log a thread dump on timeouts Would be useful to get a thread dump on timeouts, e.g. for AsyncStreamTask callback timeout, container shutdown timeout, heartbeat monitor graceful shutdown timeout etc. Author: Prateek Maheshwari <[email protected]> Author: Prateek Maheshwari <[email protected]> Reviewers: Jacob Maes <[email protected]> Closes #460 from prateekm/thread-dump-on-timeout Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/13ff0902 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/13ff0902 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/13ff0902 Branch: refs/heads/master Commit: 13ff090245b3539ee5c048c707dcee2e5bde5252 Parents: c99e571 Author: Prateek Maheshwari <[email protected]> Authored: Wed Mar 28 18:14:34 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Wed Mar 28 18:14:34 2018 -0700 ---------------------------------------------------------------------- .../container/ContainerHeartbeatMonitor.java | 3 +++ .../SamzaContainerExceptionHandler.java | 8 ++++++-- .../apache/samza/task/TaskCallbackManager.java | 2 ++ .../apache/samza/container/SamzaContainer.scala | 3 ++- .../main/scala/org/apache/samza/util/Util.scala | 20 ++++++++++++++++++-- 5 files changed, 31 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/13ff0902/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java index 64e7450..ba21141 100644 --- a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java +++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java @@ -24,6 +24,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +57,7 @@ public class ContainerHeartbeatMonitor { scheduler.schedule(() -> { // On timeout of container shutting down, force exit. LOG.error("Graceful shutdown timeout expired. Force exiting."); + Util.printThreadDump("Thread dump at heartbeat monitor shutdown timeout."); System.exit(1); }, SHUTDOWN_TIMOUT_MS, TimeUnit.MILLISECONDS); onContainerExpired.run(); http://git-wip-us.apache.org/repos/asf/samza/blob/13ff0902/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java index 0381794..2a01cd7 100644 --- a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java +++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java @@ -20,6 +20,8 @@ package org.apache.samza.container; import java.lang.Thread.UncaughtExceptionHandler; + +import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,10 +47,12 @@ public class SamzaContainerExceptionHandler implements UncaughtExceptionHandler */ @Override public void uncaughtException(Thread t, Throwable e) { - LOGGER.error( - String.format("Uncaught exception in thread (name=%s). Exiting process now.", t.getName()), e); + String msg = String.format("Uncaught exception in thread %s.", t.getName()); + LOGGER.error(msg, e); + System.err.println(msg); e.printStackTrace(System.err); try { + Util.printThreadDump("Thread dump from uncaught exception handler."); runnable.run(); } catch (Throwable throwable) { // Ignore to avoid further exception propagation http://git-wip-us.apache.org/repos/asf/samza/blob/13ff0902/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java index c773368..036706f 100644 --- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java +++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java @@ -32,6 +32,7 @@ import org.apache.samza.SamzaException; import org.apache.samza.container.TaskName; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.util.HighResolutionClock; +import org.apache.samza.util.Util; /** @@ -100,6 +101,7 @@ class TaskCallbackManager { Runnable timerTask = new Runnable() { @Override public void run() { + Util.printThreadDump("Thread dump at task callback timeout"); String msg = "Callback for task {} " + callback.taskName + " timed out after " + timeout + " ms."; callback.failure(new SamzaException(msg)); } http://git-wip-us.apache.org/repos/asf/samza/blob/13ff0902/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 7cc1924..a5b172d 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -940,7 +940,7 @@ class SamzaContainer( def addShutdownHook { val runLoopThread = Thread.currentThread() - shutdownHookThread = new Thread("CONTAINER-SHUTDOWN-HOOK") { + shutdownHookThread = new Thread("Samza Container Shutdown Hook Thread") { override def run() = { info("Shutting down, will wait up to %s ms." format shutdownMs) shutdownRunLoop() //TODO: Pull out shutdown hook to LocalContainerRunner or SP @@ -954,6 +954,7 @@ class SamzaContainer( info("Shutdown complete") } else { error("Did not shut down within %s ms, exiting." format shutdownMs) + Util.printThreadDump("Thread dump from Samza Container Shutdown Hook.") } } } http://git-wip-us.apache.org/repos/asf/samza/blob/13ff0902/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index ea23760..a50ffeb 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -37,7 +37,8 @@ import scala.collection.immutable.Map object Util extends Logging { - val random = new Random + val Random = new Random + val ThreadMxBean = ManagementFactory.getThreadMXBean def clock: Long = System.currentTimeMillis /** @@ -49,7 +50,7 @@ object Util extends Logging { * Get a random number >= startInclusive, and < endExclusive. */ def randomBetween(startInclusive: Int, endExclusive: Int) = - startInclusive + random.nextInt(endExclusive - startInclusive) + startInclusive + Random.nextInt(endExclusive - startInclusive) /** * Recursively remove a directory (or file), and all sub-directories. Equivalent @@ -431,4 +432,19 @@ object Util extends Logging { case _ => config } } + + def printThreadDump(message: String): Unit = { + try { + val threadInfo = ThreadMxBean.dumpAllThreads(true, true) + val sb = new StringBuilder + sb.append(message).append("\n") + for (ti <- threadInfo) { + sb.append(ti.toString).append("\n") + } + System.out.println(sb) + } catch { + case e: Exception => + System.out.println("Could not get and log a thread dump", e) + } + } }
