Fix WaitQueueTest flakiness
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e1bb7926 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e1bb7926 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e1bb7926 Branch: refs/heads/cassandra-2.2 Commit: e1bb79260a9bdf478895724ea180cf0c2efb37ff Parents: 20f12e9 Author: Benedict Elliott Smith <[email protected]> Authored: Thu Aug 6 14:45:54 2015 +0200 Committer: Benedict Elliott Smith <[email protected]> Committed: Thu Aug 6 14:45:54 2015 +0200 ---------------------------------------------------------------------- test/unit/org/apache/cassandra/Util.java | 5 ++ .../cassandra/concurrent/WaitQueueTest.java | 91 ++++++-------------- 2 files changed, 32 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1bb7926/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index 1015be6..e05468f 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -376,4 +376,9 @@ public class Util Composite endName = CellNames.simpleDense(ByteBufferUtil.bytes(finish)); return new RangeTombstone(startName, endName, timestamp , localtime); } + + public static void joinThread(Thread thread) throws InterruptedException + { + thread.join(10000); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1bb7926/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java index 3e7cb7b..fdc6880 100644 --- a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java +++ b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java @@ -21,10 +21,13 @@ package org.apache.cassandra.concurrent; */ +import org.apache.cassandra.Util; import org.apache.cassandra.utils.concurrent.WaitQueue; import org.junit.*; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.*; @@ -38,6 +41,7 @@ public class WaitQueueTest } public void testSerial(final WaitQueue queue) throws InterruptedException { + final AtomicInteger ready = new AtomicInteger(); Thread[] ts = new Thread[4]; for (int i = 0 ; i < ts.length ; i++) ts[i] = new Thread(new Runnable() @@ -46,6 +50,7 @@ public class WaitQueueTest public void run() { WaitQueue.Signal wait = queue.register(); + ready.incrementAndGet(); try { wait.await(); @@ -55,68 +60,28 @@ public class WaitQueueTest } } }); - for (int i = 0 ; i < ts.length ; i++) - ts[i].start(); - Thread.sleep(100); - queue.signal(); - queue.signal(); - queue.signal(); - queue.signal(); - for (int i = 0 ; i < ts.length ; i++) + for (Thread t : ts) + t.start(); + final ThreadLocalRandom random = ThreadLocalRandom.current(); + while (ready.get() < ts.length) + random.nextLong(); + for (Thread t : ts) + queue.signal(); + for (Thread t : ts) { - ts[i].join(100); - assertFalse(queue.getClass().getName(), ts[i].isAlive()); + Util.joinThread(t); + assertFalse(queue.getClass().getName(), t.isAlive()); } } - - @Test - public void testCondition1() throws InterruptedException - { - testCondition1(new WaitQueue()); - } - - public void testCondition1(final WaitQueue queue) throws InterruptedException - { - final AtomicBoolean cond1 = new AtomicBoolean(false); - final AtomicBoolean fail = new AtomicBoolean(false); - Thread t1 = new Thread(new Runnable() - { - @Override - public void run() - { - try - { - Thread.sleep(200); - } catch (InterruptedException e) - { - e.printStackTrace(); - } - WaitQueue.Signal wait = queue.register(); - if (!cond1.get()) - { - System.err.println("Condition should have already been met"); - fail.set(true); - } - } - }); - t1.start(); - Thread.sleep(50); - cond1.set(true); - Thread.sleep(300); - queue.signal(); - t1.join(300); - assertFalse(queue.getClass().getName(), t1.isAlive()); - assertFalse(fail.get()); - } - @Test - public void testCondition2() throws InterruptedException + public void testCondition() throws InterruptedException { - testCondition2(new WaitQueue()); + testCondition(new WaitQueue()); } - public void testCondition2(final WaitQueue queue) throws InterruptedException + public void testCondition(final WaitQueue queue) throws InterruptedException { + final AtomicBoolean ready = new AtomicBoolean(false); final AtomicBoolean condition = new AtomicBoolean(false); final AtomicBoolean fail = new AtomicBoolean(false); Thread t = new Thread(new Runnable() @@ -129,16 +94,12 @@ public class WaitQueueTest { System.err.println(""); fail.set(true); + ready.set(true); + return; } - try - { - Thread.sleep(200); - wait.await(); - } catch (InterruptedException e) - { - e.printStackTrace(); - } + ready.set(true); + wait.awaitUninterruptibly(); if (!condition.get()) { System.err.println("Woke up when condition not met"); @@ -147,10 +108,12 @@ public class WaitQueueTest } }); t.start(); - Thread.sleep(50); + final ThreadLocalRandom random = ThreadLocalRandom.current(); + while (!ready.get()) + random.nextLong(); condition.set(true); queue.signal(); - t.join(300); + Util.joinThread(t); assertFalse(queue.getClass().getName(), t.isAlive()); assertFalse(fail.get()); }
