Repository: incubator-reef
Updated Branches:
  refs/heads/master fffee8545 -> d9f83715b


[REEF-309] Add deadlock information to thread dump on driver force close

  This PR adds deadlock information by adding a new static method to
  ThreadLogger. The information will only be logged when a deadlock exists.

JIRA:
  [REEF-309] https://issues.apache.org/jira/browse/REEF-309

Pull Request:
  Closes #179

Author:
  Brian Cho [email protected]


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/d9f83715
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/d9f83715
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/d9f83715

Branch: refs/heads/master
Commit: d9f83715b39c1165b96b582405d49a4241efd9cd
Parents: fffee85
Author: Brian Cho <[email protected]>
Authored: Thu May 7 16:56:18 2015 +0900
Committer: Byung-Gon Chun <[email protected]>
Committed: Fri May 8 13:10:01 2015 -0700

----------------------------------------------------------------------
 .../defaults/DefaultClientCloseHandler.java     |   9 +-
 .../java/org/apache/reef/util/DeadlockInfo.java | 101 ++++++++++++
 .../java/org/apache/reef/util/ThreadLogger.java |  48 ++++++
 .../org/apache/reef/util/DeadlockInfoTest.java  | 159 +++++++++++++++++++
 4 files changed, 314 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d9f83715/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseHandler.java
index 42722dd..3e44637 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseHandler.java
@@ -38,9 +38,12 @@ public final class DefaultClientCloseHandler implements 
EventHandler<Void> {
 
   @Override
   public void onNext(final Void aVoid) {
-    final String message = ThreadLogger.getFormattedThreadList(
+    final String threads = ThreadLogger.getFormattedThreadList(
         "Received a close message from the client, but no handler was bound 
for it. Active threads: ");
-    LOG.log(Level.WARNING, message);
-    throw new RuntimeException(message);
+    LOG.log(Level.WARNING, threads);
+    final String deadlocks = ThreadLogger.getFormattedDeadlockInfo("Deadlocked 
threads: ");
+    LOG.log(Level.WARNING, deadlocks);
+
+    throw new RuntimeException(threads + "\n" + deadlocks);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d9f83715/lang/java/reef-common/src/main/java/org/apache/reef/util/DeadlockInfo.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/util/DeadlockInfo.java 
b/lang/java/reef-common/src/main/java/org/apache/reef/util/DeadlockInfo.java
new file mode 100644
index 0000000..24eae01
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/util/DeadlockInfo.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util;
+
+import javax.annotation.Nullable;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides a view into deadlocked threads for logging or debugging purposes.
+ * Backed by ThreadMXBean
+ */
+final class DeadlockInfo {
+  private final ThreadMXBean mxBean;
+  private final ThreadInfo[] deadlockedThreads;
+  private final Map<ThreadInfo, Map<StackTraceElement, List<MonitorInfo>>> 
monitorLockedElements;
+
+  public DeadlockInfo() {
+    mxBean = ManagementFactory.getThreadMXBean();
+    deadlockedThreads = mxBean.getThreadInfo(mxBean.findDeadlockedThreads(), 
true, true);
+    monitorLockedElements = new HashMap<>();
+    for (final ThreadInfo threadInfo : deadlockedThreads) {
+      monitorLockedElements.put(threadInfo, 
constructMonitorLockedElements(threadInfo));
+    }
+  }
+
+  /**
+   * @return An array of deadlocked threads
+   */
+  public ThreadInfo[] getDeadlockedThreads() {
+    return deadlockedThreads;
+  }
+
+  /**
+   * Get a list of monitor locks that were acquired by this thread at this 
stack element
+   * @param threadInfo The thread that created the stack element
+   * @param stackTraceElement The stack element
+   * @return List of monitor locks that were acquired by this thread at this 
stack element or an empty list if none were acquired
+   */
+  public List<MonitorInfo> getMonitorLockedElements(final ThreadInfo 
threadInfo, final StackTraceElement stackTraceElement) {
+    final Map<StackTraceElement, List<MonitorInfo>> elementMap = 
monitorLockedElements.get(threadInfo);
+    if (null == elementMap) {
+      return Collections.EMPTY_LIST;
+    }
+
+    final List<MonitorInfo> monitorList = elementMap.get(stackTraceElement);
+    if (null == monitorList) {
+      return Collections.EMPTY_LIST;
+    }
+
+    return monitorList;
+  }
+
+  /**
+   * Get a string identifying the lock that this thread is waiting on
+   * @param threadInfo
+   * @return A string identifying the lock that this thread is waiting on, or 
null if the thread is not waiting on a lock
+   */
+  @Nullable
+  public String getWaitingLockString(final ThreadInfo threadInfo) {
+    if (null == threadInfo.getLockInfo()) {
+      return null;
+    } else {
+      return threadInfo.getLockName() + " held by " + 
threadInfo.getLockOwnerName();
+    }
+  }
+
+  private static Map<StackTraceElement, List<MonitorInfo>> 
constructMonitorLockedElements(final ThreadInfo threadInfo) {
+    final Map<StackTraceElement, List<MonitorInfo>> monitorLockedElements = 
new HashMap<>();
+    for (final MonitorInfo monitorInfo : threadInfo.getLockedMonitors()) {
+      final List<MonitorInfo> monitorInfoList = 
monitorLockedElements.containsKey(monitorInfo.getLockedStackFrame()) ?
+          monitorLockedElements.get(monitorInfo.getLockedStackFrame()) : new 
LinkedList<MonitorInfo>();
+      monitorInfoList.add(monitorInfo);
+      monitorLockedElements.put(monitorInfo.getLockedStackFrame(), 
monitorInfoList);
+    }
+    return monitorLockedElements;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d9f83715/lang/java/reef-common/src/main/java/org/apache/reef/util/ThreadLogger.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/util/ThreadLogger.java 
b/lang/java/reef-common/src/main/java/org/apache/reef/util/ThreadLogger.java
index 9a47184..dd54bf9 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/util/ThreadLogger.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/util/ThreadLogger.java
@@ -18,6 +18,9 @@
  */
 package org.apache.reef.util;
 
+import java.lang.management.LockInfo;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -84,6 +87,51 @@ public final class ThreadLogger {
   }
 
   /**
+   * Produces a String representation of threads that are deadlocked, 
including lock information
+   * @param prefix             The prefix of the string returned.
+   * @param threadPrefix       Printed before each thread, e.g. "\n\t" to 
create an indented list.
+   * @param stackElementPrefix Printed before each stack trace element, e.g. 
"\n\t\t" to create an indented list.
+   * @return a String representation of threads that are deadlocked, including 
lock information
+   */
+  public static String getFormattedDeadlockInfo(
+      final String prefix, final String threadPrefix, final String 
stackElementPrefix) {
+    final StringBuilder message = new StringBuilder(prefix);
+
+    final DeadlockInfo deadlockInfo = new DeadlockInfo();
+    for (final ThreadInfo threadInfo : deadlockInfo.getDeadlockedThreads()) {
+      message.append(threadPrefix).append("Thread 
'").append(threadInfo.getThreadName())
+          .append("' with state ").append(threadInfo.getThreadState());
+
+      boolean firstElement = true;
+      for (final StackTraceElement stackTraceElement : 
threadInfo.getStackTrace()) {
+        message.append(stackElementPrefix).append("at 
").append(stackTraceElement);
+        if (firstElement) {
+          final String waitingLockString = 
deadlockInfo.getWaitingLockString(threadInfo);
+          if (waitingLockString != null) {
+            message.append(stackElementPrefix).append("- waiting to lock: 
").append(waitingLockString);
+          }
+          firstElement = false;
+        }
+        for (final MonitorInfo info : 
deadlockInfo.getMonitorLockedElements(threadInfo, stackTraceElement)) {
+          message.append(stackElementPrefix).append("- locked: ").append(info);
+        }
+      }
+      for (final LockInfo lockInfo : threadInfo.getLockedSynchronizers()) {
+        message.append(stackElementPrefix).append("* holds locked 
synchronizer: ").append(lockInfo);
+      }
+    }
+
+    return message.toString();
+  }
+
+  /**
+   * Same as <code>getFormattedDeadlockInfo(prefix, "\n\t", "\n\t\t")</code>
+   */
+  public static String getFormattedDeadlockInfo(final String prefix) {
+    return getFormattedDeadlockInfo(prefix, "\n\t", "\n\t\t");
+  }
+
+  /**
    * An example how to use the above methods.
    *
    * @param args ignored.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d9f83715/lang/java/reef-common/src/test/java/org/apache/reef/util/DeadlockInfoTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/test/java/org/apache/reef/util/DeadlockInfoTest.java
 
b/lang/java/reef-common/src/test/java/org/apache/reef/util/DeadlockInfoTest.java
new file mode 100644
index 0000000..7993b4b
--- /dev/null
+++ 
b/lang/java/reef-common/src/test/java/org/apache/reef/util/DeadlockInfoTest.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.lang.management.ThreadInfo;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Test DeadlockInfo by creating a deadlock
+ */
+public final class DeadlockInfoTest {
+  private static final Logger LOG = 
Logger.getLogger(DeadlockInfoTest.class.getName());
+
+  private static final long timeoutMillis = 50;
+
+  /**
+   * Create a deadlock consisting of two threads.
+   * The threads wait on a barrier, and once the barrier is met they proceed 
to deadlock.
+   * setUpClass sleeps for timeoutMillis to allow the threads time to progress 
past the barrier into deadlock.
+   *
+   * One thread holds an Object and Long lock, and is waiting on an Integer 
lock.
+   * The other thread holds the Integer lock and is waiting on the Long lock.
+   */
+  @BeforeClass
+  public static void setUpClass() {
+    createDeadlock();
+    threadSleep(timeoutMillis);
+  }
+
+  /**
+   * Create a deadlock consisting of two threads,
+   * then test that DeadlockInfo returns the expected values given the 
deadlock.
+   *
+   * One thread holds an Object and Long lock, and is waiting on an Integer 
lock.
+   * The other thread holds the Integer lock and is waiting on the Long lock.
+   */
+  @Test
+  public void testDeadlockInfo() {
+    final DeadlockInfo deadlockInfo = new DeadlockInfo();
+
+    final ThreadInfo[] threadInfos = deadlockInfo.getDeadlockedThreads();
+    assertEquals(2, threadInfos.length);
+
+    for (final ThreadInfo threadInfo : deadlockInfo.getDeadlockedThreads()) {
+      final String waitingLockString = 
deadlockInfo.getWaitingLockString(threadInfo);
+      assertNotNull("Each thread is expected to have a waiting lock", 
waitingLockString);
+      if (waitingLockString.contains("Integer")) {
+        assertNumberOfLocksHeld(2, deadlockInfo, threadInfo);
+      } else if (waitingLockString.contains("Long")) {
+        assertNumberOfLocksHeld(1, deadlockInfo, threadInfo);
+      } else {
+        fail("Unexpected waitingLockString of "+waitingLockString);
+      }
+    }
+  }
+
+  @Test
+  public void testLogDeadlockInfo() {
+    LOG.log(Level.INFO, ThreadLogger.getFormattedDeadlockInfo("Deadlock test, 
this deadlock is expected"));
+  }
+
+  private static void assertNumberOfLocksHeld(
+      final int expected, final DeadlockInfo deadlockInfo, final ThreadInfo 
threadInfo) {
+    int sum = 0;
+    for (final StackTraceElement stackTraceElement : 
threadInfo.getStackTrace()) {
+      sum += deadlockInfo.getMonitorLockedElements(threadInfo, 
stackTraceElement).size();
+    }
+    assertEquals(expected, sum);
+  }
+
+  private static void createDeadlock() {
+    final CyclicBarrier barrier = new CyclicBarrier(2);
+
+    final Integer lock1 = new Integer(0);
+    final Long lock2 = new Long(0);
+
+    final Thread thread1 = new Thread() {
+      @Override
+      public void run() {
+        synchronized (lock1) {
+          barrierAwait(barrier);
+          lockLeaf(lock2);
+        }
+      }
+    };
+
+    final Thread thread2 = new Thread() {
+      @Override
+      public void run() {
+        synchronized (new Object()) {
+          synchronized (lock2) {
+            barrierAwait(barrier);
+            lockLeaf(lock1);
+          }
+        }
+      }
+    };
+
+    thread1.start();
+    thread2.start();
+  }
+
+  private static void barrierAwait(final CyclicBarrier barrier) {
+    try {
+      barrier.await(timeoutMillis, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail("Unexpected exception");
+    } catch (BrokenBarrierException e) {
+      e.printStackTrace();
+      fail("Unexpected exception");
+    } catch (TimeoutException e) {
+      e.printStackTrace();
+      fail("Unexpected exception");
+    }
+  }
+
+  private static void lockLeaf(final Object lock) {
+    synchronized (lock) {
+      fail("The unit test failed to create a deadlock");
+    }
+  }
+
+  private static void threadSleep(final long millis) {
+    try {
+      Thread.sleep(millis);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+}

Reply via email to