Author: todd
Date: Tue May 17 01:08:08 2011
New Revision: 1103971
URL: http://svn.apache.org/viewvc?rev=1103971&view=rev
Log:
HADOOP-7292. Fix racy test case TestSinkQueue. Contributed by Luke Lu.
Modified:
hadoop/common/trunk/CHANGES.txt
hadoop/common/trunk/src/test/core/org/apache/hadoop/metrics2/impl/TestSinkQueue.java
Modified: hadoop/common/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=1103971&r1=1103970&r2=1103971&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Tue May 17 01:08:08 2011
@@ -224,6 +224,8 @@ Trunk (unreleased changes)
HADOOP-7290. Unit test failure in
TestUserGroupInformation.testGetServerSideGroups. (Trevor Robison via eli)
+ HADOOP-7292. Fix racy test case TestSinkQueue. (Luke Lu via todd)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified:
hadoop/common/trunk/src/test/core/org/apache/hadoop/metrics2/impl/TestSinkQueue.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/metrics2/impl/TestSinkQueue.java?rev=1103971&r1=1103970&r2=1103971&view=diff
==============================================================================
---
hadoop/common/trunk/src/test/core/org/apache/hadoop/metrics2/impl/TestSinkQueue.java
(original)
+++
hadoop/common/trunk/src/test/core/org/apache/hadoop/metrics2/impl/TestSinkQueue.java
Tue May 17 01:08:08 2011
@@ -19,6 +19,7 @@
package org.apache.hadoop.metrics2.impl;
import java.util.ConcurrentModificationException;
+import java.util.concurrent.CountDownLatch;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -32,7 +33,7 @@ import static org.apache.hadoop.metrics2
* Test the half-blocking metrics sink queue
*/
public class TestSinkQueue {
- private final Log LOG = LogFactory.getLog(TestSinkQueue.class);
+ private static final Log LOG = LogFactory.getLog(TestSinkQueue.class);
/**
* Test common use case
@@ -63,6 +64,11 @@ public class TestSinkQueue {
* @throws Exception
*/
@Test public void testEmptyBlocking() throws Exception {
+ testEmptyBlocking(0);
+ testEmptyBlocking(100);
+ }
+
+ private void testEmptyBlocking(int awhile) throws Exception {
final SinkQueue<Integer> q = new SinkQueue<Integer>(2);
final Runnable trigger = mock(Runnable.class);
// try consuming emtpy equeue and blocking
@@ -83,7 +89,10 @@ public class TestSinkQueue {
}
};
t.start();
- Thread.yield(); // Let the other block
+ // Should work with or without sleep
+ if (awhile > 0) {
+ Thread.sleep(awhile);
+ }
q.enqueue(1);
q.enqueue(2);
t.join();
@@ -228,22 +237,26 @@ public class TestSinkQueue {
LOG.info(e);
return;
}
- fail("should've thrown");
+ LOG.error("should've thrown CME");
+ fail("should've thrown CME");
}
private SinkQueue<Integer> newSleepingConsumerQueue(int capacity,
- int... values) {
+ int... values) throws Exception {
final SinkQueue<Integer> q = new SinkQueue<Integer>(capacity);
for (int i : values) {
q.enqueue(i);
}
+ final CountDownLatch barrier = new CountDownLatch(1);
Thread t = new Thread() {
@Override public void run() {
try {
+ Thread.sleep(10); // causes failure without barrier
q.consume(new Consumer<Integer>() {
@Override
public void consume(Integer e) throws InterruptedException {
LOG.info("sleeping");
+ barrier.countDown();
Thread.sleep(1000 * 86400); // a long time
}
});
@@ -256,7 +269,7 @@ public class TestSinkQueue {
t.setName("Sleeping consumer");
t.setDaemon(true); // so jvm can exit
t.start();
- Thread.yield(); // Let the consumer consume
+ barrier.await();
LOG.debug("Returning new sleeping consumer queue");
return q;
}