Repository: ambari Updated Branches: refs/heads/branch-2.1 d48bf1ea1 -> fff4d7368
AMBARI-11692. ClusterDeadlockTest unit test fails.(vbrodetskyi) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/fff4d736 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/fff4d736 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/fff4d736 Branch: refs/heads/branch-2.1 Commit: fff4d7368bf49abb29815734bf86e4ba9e0101ae Parents: d48bf1e Author: Vitaly Brodetskyi <[email protected]> Authored: Thu Jun 11 06:50:43 2015 +0300 Committer: Vitaly Brodetskyi <[email protected]> Committed: Thu Jun 11 06:50:43 2015 +0300 ---------------------------------------------------------------------- .../state/cluster/ClusterDeadlockTest.java | 48 ++++- .../server/testing/DeadlockWarningThread.java | 135 +++++++++++++ .../server/testing/DeadlockedThreadsTest.java | 188 +++++++++++++++++++ 3 files changed, 361 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/fff4d736/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java index 2f064ab..08f9743 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java @@ -18,6 +18,7 @@ package org.apache.ambari.server.state.cluster; +import org.apache.ambari.server.testing.DeadlockWarningThread; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -143,7 +144,7 @@ public class ClusterDeadlockTest { * * @throws Exception */ - @Test(timeout = 30000) + @Test() public void testDeadlockBetweenImplementations() throws Exception { Service service = cluster.getService("HDFS"); ServiceComponent nameNodeComponent = service.getServiceComponent("NAMENODE"); @@ -168,8 +169,17 @@ public class ClusterDeadlockTest { threads.add(thread); } - for (Thread thread : threads) { - thread.join(); + DeadlockWarningThread wt = new DeadlockWarningThread(threads); + + while (true) { + if(!wt.isAlive()) { + break; + } + } + if (wt.isDeadlocked()){ + Assert.assertFalse(wt.getErrorMessages().toString(), wt.isDeadlocked()); + } else { + Assert.assertFalse(wt.isDeadlocked()); } } @@ -179,7 +189,7 @@ public class ClusterDeadlockTest { * * @throws Exception */ - @Test(timeout = 35000) + @Test() public void testAddingHostComponentsWhileReading() throws Exception { Service service = cluster.getService("HDFS"); ServiceComponent nameNodeComponent = service.getServiceComponent("NAMENODE"); @@ -194,8 +204,17 @@ public class ClusterDeadlockTest { threads.add(thread); } - for (Thread thread : threads) { - thread.join(); + DeadlockWarningThread wt = new DeadlockWarningThread(threads); + + while (true) { + if(!wt.isAlive()) { + break; + } + } + if (wt.isDeadlocked()){ + Assert.assertFalse(wt.getErrorMessages().toString(), wt.isDeadlocked()); + } else { + Assert.assertFalse(wt.isDeadlocked()); } } @@ -205,7 +224,7 @@ public class ClusterDeadlockTest { * * @throws Exception */ - @Test(timeout = 75000) + @Test() public void testDeadlockWhileRestartingComponents() throws Exception { // for each host, install both components List<ServiceComponentHost> serviceComponentHosts = new ArrayList<ServiceComponentHost>(); @@ -236,9 +255,18 @@ public class ClusterDeadlockTest { clusterWriterThread.start(); schWriterThread.start(); } - - for (Thread thread : threads) { - thread.join(); + + DeadlockWarningThread wt = new DeadlockWarningThread(threads); + + while (true) { + if(!wt.isAlive()) { + break; + } + } + if (wt.isDeadlocked()){ + Assert.assertFalse(wt.getErrorMessages().toString(), wt.isDeadlocked()); + } else { + Assert.assertFalse(wt.isDeadlocked()); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/fff4d736/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockWarningThread.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockWarningThread.java b/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockWarningThread.java new file mode 100644 index 0000000..b1237df --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockWarningThread.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * +* http://www.apache.org/licenses/LICENSE-2.0 + * +* Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.ambari.server.testing; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +/** + * + * Monitoring of deadlocks thread + * Please note. This class can not be used outside of tests + */ +public class DeadlockWarningThread extends Thread { + + private Thread parentThread; + private final List<String> errorMessages; + private static final int MAX_STACK_DEPTH = 30; + private Collection<Thread> monitoredThreads = null; + private boolean deadlocked = false; + private static final ThreadMXBean mbean = ManagementFactory.getThreadMXBean(); + private String stacktrace = ""; + + public List<String> getErrorMessages() { + return errorMessages; + } + + public boolean isDeadlocked() { + return deadlocked; + } + + public DeadlockWarningThread(Collection<Thread> monitoredThreads) { + this.errorMessages = new ArrayList<String>(); + this.monitoredThreads = monitoredThreads; + parentThread = Thread.currentThread(); + start(); + } + + public String getThreadsStacktraces(long[] ids) { + StringBuilder errBuilder = new StringBuilder(); + for (long id : ids) { + ThreadInfo ti = mbean.getThreadInfo(id, MAX_STACK_DEPTH); + errBuilder.append("Deadlocked Thread:\n"). + append("------------------\n"). + append(ti).append('\n'); + for (StackTraceElement ste : ti.getStackTrace()) { + errBuilder.append('\t').append(ste); + } + errBuilder.append('\n'); + } + return errBuilder.toString(); + } + + + @Override + public void run() { + while (true) { + try { + Thread.sleep(3000); + } catch (InterruptedException ex) { + } + long[] ids = mbean.findMonitorDeadlockedThreads(); + StringBuilder errBuilder = new StringBuilder(); + if (ids != null && ids.length > 0) { + errBuilder.append(getThreadsStacktraces(ids)); + errorMessages.add(errBuilder.toString()); + System.out.append(errBuilder.toString()); + //Exit if deadlocks have been found + deadlocked = true; + break; + } else { + //Exit if all monitored threads were finished + boolean hasLive = false; + Set<Thread> activeThreads = new HashSet<Thread>(); + for (Thread monTh : monitoredThreads) { + ThreadGroup group = monTh.getThreadGroup(); + Thread[] groupThreads = new Thread[group.activeCount()]; + group.enumerate(groupThreads, true); + activeThreads.addAll(Arrays.asList(groupThreads)); + } + activeThreads.remove(Thread.currentThread()); + activeThreads.remove(parentThread); + Set<Long> idSet = new TreeSet<Long>(); + for (Thread activeThread : activeThreads) { + if (activeThread.isAlive()) { + hasLive = true; + idSet.add(activeThread.getId()); + } + } + long[] tid = new long[idSet.size()]; + if (!hasLive) { + deadlocked = false; + break; + } else { + int cnt = 0; + for (Long id : idSet) { + tid[cnt] = id; + cnt++; + } + String currentStackTrace = getThreadsStacktraces(tid); + if (stacktrace.equals(currentStackTrace)) { + errBuilder.append(currentStackTrace); + errorMessages.add(currentStackTrace); + System.out.append(currentStackTrace); + deadlocked = true; + break; + } else { + stacktrace = currentStackTrace; + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/fff4d736/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockedThreadsTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockedThreadsTest.java b/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockedThreadsTest.java new file mode 100644 index 0000000..6ce3acc --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockedThreadsTest.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * +* http://www.apache.org/licenses/LICENSE-2.0 + * +* Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.ambari.server.testing; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.junit.Assert; +import org.junit.Test; + +/** + * + * Test if DeadlockWarningThread can detect deadlocks properly + */ +public class DeadlockedThreadsTest { + static Set<Thread> threads = new HashSet<Thread>(); + + /** + * + * Test should detect "flat" deadlock + * This test commented because it is not testing any production code + * In case if we change DeadlockWarningThread and need test of changes + * we can add there @Test annotation + */ + public void testDeadlocks() { + + // deadlock with three locks + Object lock1 = new String("lock1"); + Object lock2 = new String("lock2"); + Object lock3 = new String("lock3"); + + + threads.add(new DeadlockingThread("t1", lock1, lock2)); + threads.add(new DeadlockingThread("t2", lock2, lock3)); + threads.add(new DeadlockingThread("t3", lock3, lock1)); + + // deadlock with two locks + Object lock4 = new String("lock4"); + Object lock5 = new String("lock5"); + + threads.add(new DeadlockingThread("t4", lock4, lock5)); + threads.add(new DeadlockingThread("t5", lock5, lock4)); + DeadlockWarningThread wt = new DeadlockWarningThread(threads); + + + + while (true) { + if(!wt.isAlive()) { + break; + } + } + if (wt.isDeadlocked()){ + Assert.assertTrue(wt.getErrorMessages().toString(), wt.isDeadlocked()); + Assert.assertFalse(wt.getErrorMessages().toString().equals("")); + } else { + Assert.assertTrue(wt.getErrorMessages().toString(), wt.isDeadlocked()); + } + + } + + /** + * + * Test should detect "hidden" deadlock + * This test commented because it is not testing any production code + * In case if we change DeadlockWarningThread and need test of changes + * we can add there @Test annotation + */ + public void testReadWriteDeadlocks() { + + // deadlock with three locks + Object lock1 = new String("lock1"); + Object lock2 = new String("lock2"); + Object lock3 = new String("lock3"); + + + threads.add(new DeadlockingThreadReadWriteLock("t1", lock1, lock2)); + threads.add(new DeadlockingThreadReadWriteLock("t2", lock2, lock3)); + threads.add(new DeadlockingThreadReadWriteLock("t3", lock3, lock1)); + + // deadlock with two locks + Object lock4 = new String("lock4"); + Object lock5 = new String("lock5"); + + threads.add(new DeadlockingThreadReadWriteLock("t4", lock4, lock5)); + threads.add(new DeadlockingThreadReadWriteLock("t5", lock5, lock4)); + DeadlockWarningThread wt = new DeadlockWarningThread(threads); + + + + while (true) { + if(!wt.isAlive()) { + break; + } + } + if (wt.isDeadlocked()){ + Assert.assertTrue(wt.getErrorMessages().toString(), wt.isDeadlocked()); + Assert.assertFalse(wt.getErrorMessages().toString().equals("")); + } else { + Assert.assertTrue(wt.getErrorMessages().toString(), wt.isDeadlocked()); + } + + } + + + /** + * There is absolutely nothing you can do when you have + * deadlocked threads. You cannot stop them, you cannot + * interrupt them, you cannot tell them to stop trying to + * get a lock, and you also cannot tell them to let go of + * the locks that they own. + */ + private static class DeadlockingThread extends Thread { + private final Object lock1; + private final Object lock2; + + public DeadlockingThread(String name, Object lock1, Object lock2) { + super(name); + this.lock1 = lock1; + this.lock2 = lock2; + start(); + } + public void run() { + while (true) { + f(); + } + } + private void f() { + synchronized (lock1) { + g(); + } + } + private void g() { + synchronized (lock2) { + // do some work... + for (int i = 0; i < 1000 * 1000; i++) ; + } + } + } + + private static class DeadlockingThreadReadWriteLock extends Thread { + private final Object lock1; + private final Object lock2; + private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); + public final Lock r = rwl.readLock(); + public final Lock w = rwl.writeLock(); + + public DeadlockingThreadReadWriteLock(String name, Object lock1, Object lock2) { + super(name); + this.lock1 = lock1; + this.lock2 = lock2; + start(); + } + public void run() { + while (true) { + f(); + } + } + private void f() { + w.lock(); { + g(); + } w.unlock(); + } + + private void g() { + r.lock(); { + // do some work... + for (int i = 0; i < 1000 * 1000; i++) ; + } r.unlock(); + } + } + +} \ No newline at end of file
