Author: todd
Date: Mon Jul 25 21:11:39 2011
New Revision: 1150912
URL: http://svn.apache.org/viewvc?rev=1150912&view=rev
Log:
HADOOP-7298. Add test utility for writing multi-threaded tests. Contributed by
Todd Lipcon and Harsh J Chouraria.
Added:
hadoop/common/branches/branch-0.22/common/src/test/core/org/apache/hadoop/test/MultithreadedTestUtil.java
hadoop/common/branches/branch-0.22/common/src/test/core/org/apache/hadoop/test/TestMultithreadedTestUtil.java
Modified:
hadoop/common/branches/branch-0.22/common/CHANGES.txt
Modified: hadoop/common/branches/branch-0.22/common/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/common/CHANGES.txt?rev=1150912&r1=1150911&r2=1150912&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.22/common/CHANGES.txt Mon Jul 25 21:11:39
2011
@@ -263,6 +263,9 @@ Release 0.22.0 - Unreleased
HADOOP-7106. Reorganize project SVN layout to "unsplit" the projects.
(todd, nigel)
+ HADOOP-7298. Add test utility for writing multi-threaded tests. (todd and
+ Harsh J Chouraria via todd)
+
OPTIMIZATIONS
HADOOP-6884. Add LOG.isDebugEnabled() guard for each LOG.debug(..).
Added:
hadoop/common/branches/branch-0.22/common/src/test/core/org/apache/hadoop/test/MultithreadedTestUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/common/src/test/core/org/apache/hadoop/test/MultithreadedTestUtil.java?rev=1150912&view=auto
==============================================================================
---
hadoop/common/branches/branch-0.22/common/src/test/core/org/apache/hadoop/test/MultithreadedTestUtil.java
(added)
+++
hadoop/common/branches/branch-0.22/common/src/test/core/org/apache/hadoop/test/MultithreadedTestUtil.java
Mon Jul 25 21:11:39 2011
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A utility to easily test threaded/synchronized code.
+ * Utility works by letting you add threads that do some work to a
+ * test context object, and then lets you kick them all off to stress test
+ * your parallel code.
+ *
+ * Also propagates thread exceptions back to the runner, to let you verify.
+ *
+ * An example:
+ *
+ * <code>
+ * final AtomicInteger threadsRun = new AtomicInteger();
+ *
+ * TestContext ctx = new TestContext();
+ * // Add 3 threads to test.
+ * for (int i = 0; i < 3; i++) {
+ * ctx.addThread(new TestingThread(ctx) {
+ * @Override
+ * public void doWork() throws Exception {
+ * threadsRun.incrementAndGet();
+ * }
+ * });
+ * }
+ * ctx.startThreads();
+ * // Set a timeout period for threads to complete.
+ * ctx.waitFor(30000);
+ * assertEquals(3, threadsRun.get());
+ * </code>
+ *
+ * For repetitive actions, use the {@link
MultithreadedTestUtil.RepeatingThread}
+ * instead.
+ *
+ * (More examples can be found in {@link TestMultithreadedTestUtil})
+ */
+public abstract class MultithreadedTestUtil {
+
+ public static final Log LOG =
+ LogFactory.getLog(MultithreadedTestUtil.class);
+
+ /**
+ * TestContext is used to setup the multithreaded test runner.
+ * It lets you add threads, run them, wait upon or stop them.
+ */
+ public static class TestContext {
+ private Throwable err = null;
+ private boolean stopped = false;
+ private Set<TestingThread> testThreads = new HashSet<TestingThread>();
+ private Set<TestingThread> finishedThreads = new HashSet<TestingThread>();
+
+ /**
+ * Check if the context can run threads.
+ * Can't if its been stopped and contains an error.
+ * @return true if it can run, false if it can't.
+ */
+ public synchronized boolean shouldRun() {
+ return !stopped && err == null;
+ }
+
+ /**
+ * Add a thread to the context for running.
+ * Threads can be of type {@link MultithreadedTestUtil.TestingThread}
+ * or {@link MultithreadedTestUtil.RepeatingTestThread}
+ * or other custom derivatives of the former.
+ * @param t the thread to add for running.
+ */
+ public void addThread(TestingThread t) {
+ testThreads.add(t);
+ }
+
+ /**
+ * Starts all test threads that have been added so far.
+ */
+ public void startThreads() {
+ for (TestingThread t : testThreads) {
+ t.start();
+ }
+ }
+
+ /**
+ * Waits for threads to finish or error out.
+ * @param millis the number of milliseconds to wait
+ * for threads to complete.
+ * @throws Exception if one or more of the threads
+ * have thrown up an error.
+ */
+ public synchronized void waitFor(long millis) throws Exception {
+ long endTime = System.currentTimeMillis() + millis;
+ while (shouldRun() &&
+ finishedThreads.size() < testThreads.size()) {
+ long left = endTime - System.currentTimeMillis();
+ if (left <= 0) break;
+ checkException();
+ wait(left);
+ }
+ checkException();
+ }
+
+ /**
+ * Checks for thread exceptions, and if they've occurred
+ * throws them as RuntimeExceptions in a deferred manner.
+ */
+ private synchronized void checkException() throws Exception {
+ if (err != null) {
+ throw new RuntimeException("Deferred", err);
+ }
+ }
+
+ /**
+ * Called by {@link MultithreadedTestUtil.TestingThread}s to signal
+ * a failed thread.
+ * @param t the thread that failed.
+ */
+ public synchronized void threadFailed(Throwable t) {
+ if (err == null) err = t;
+ LOG.error("Failed!", err);
+ notify();
+ }
+
+ /**
+ * Called by {@link MultithreadedTestUtil.TestingThread}s to signal
+ * a successful completion.
+ * @param t the thread that finished.
+ */
+ public synchronized void threadDone(TestingThread t) {
+ finishedThreads.add(t);
+ notify();
+ }
+
+ /**
+ * Returns after stopping all threads by joining them back.
+ * @throws Exception in case a thread terminated with a failure.
+ */
+ public void stop() throws Exception {
+ synchronized (this) {
+ stopped = true;
+ }
+ for (TestingThread t : testThreads) {
+ t.join();
+ }
+ checkException();
+ }
+ }
+
+ /**
+ * A thread that can be added to a test context, and properly
+ * passes exceptions through.
+ */
+ public static abstract class TestingThread extends Thread {
+ protected final TestContext ctx;
+ protected boolean stopped;
+
+ public TestingThread(TestContext ctx) {
+ this.ctx = ctx;
+ }
+
+ public void run() {
+ try {
+ doWork();
+ } catch (Throwable t) {
+ ctx.threadFailed(t);
+ }
+ ctx.threadDone(this);
+ }
+
+ /**
+ * User method to add any code to test thread behavior of.
+ * @throws Exception throw an exception if a failure has occurred.
+ */
+ public abstract void doWork() throws Exception;
+
+ protected void stopTestThread() {
+ this.stopped = true;
+ }
+ }
+
+ /**
+ * A test thread that performs a repeating operation.
+ */
+ public static abstract class RepeatingTestThread extends TestingThread {
+ public RepeatingTestThread(TestContext ctx) {
+ super(ctx);
+ }
+
+ /**
+ * Repeats a given user action until the context is asked to stop
+ * or meets an error.
+ */
+ public final void doWork() throws Exception {
+ while (ctx.shouldRun() && !stopped) {
+ doAnAction();
+ }
+ }
+
+ /**
+ * User method for any code to test repeating behavior of (as threads).
+ * @throws Exception throw an exception if a failure has occured.
+ */
+ public abstract void doAnAction() throws Exception;
+ }
+}
Added:
hadoop/common/branches/branch-0.22/common/src/test/core/org/apache/hadoop/test/TestMultithreadedTestUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/common/src/test/core/org/apache/hadoop/test/TestMultithreadedTestUtil.java?rev=1150912&view=auto
==============================================================================
---
hadoop/common/branches/branch-0.22/common/src/test/core/org/apache/hadoop/test/TestMultithreadedTestUtil.java
(added)
+++
hadoop/common/branches/branch-0.22/common/src/test/core/org/apache/hadoop/test/TestMultithreadedTestUtil.java
Mon Jul 25 21:11:39 2011
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.test;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
+import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
+
+public class TestMultithreadedTestUtil {
+
+ private static final String FAIL_MSG =
+ "Inner thread fails an assert";
+
+ @Test
+ public void testNoErrors() throws Exception {
+ final AtomicInteger threadsRun = new AtomicInteger();
+
+ TestContext ctx = new TestContext();
+ for (int i = 0; i < 3; i++) {
+ ctx.addThread(new TestingThread(ctx) {
+ @Override
+ public void doWork() throws Exception {
+ threadsRun.incrementAndGet();
+ }
+ });
+ }
+ assertEquals(0, threadsRun.get());
+ ctx.startThreads();
+ long st = System.currentTimeMillis();
+ ctx.waitFor(30000);
+ long et = System.currentTimeMillis();
+
+ // All threads should have run
+ assertEquals(3, threadsRun.get());
+ // Test shouldn't have waited the full 30 seconds, since
+ // the threads exited faster than that.
+ assertTrue("Test took " + (et - st) + "ms",
+ et - st < 5000);
+ }
+
+ @Test
+ public void testThreadFails() throws Exception {
+ TestContext ctx = new TestContext();
+ ctx.addThread(new TestingThread(ctx) {
+ @Override
+ public void doWork() throws Exception {
+ fail(FAIL_MSG);
+ }
+ });
+ ctx.startThreads();
+ long st = System.currentTimeMillis();
+ try {
+ ctx.waitFor(30000);
+ fail("waitFor did not throw");
+ } catch (RuntimeException rte) {
+ // expected
+ assertEquals(FAIL_MSG, rte.getCause().getMessage());
+ }
+ long et = System.currentTimeMillis();
+ // Test shouldn't have waited the full 30 seconds, since
+ // the thread throws faster than that
+ assertTrue("Test took " + (et - st) + "ms",
+ et - st < 5000);
+ }
+
+ @Test
+ public void testThreadThrowsCheckedException() throws Exception {
+ TestContext ctx = new TestContext();
+ ctx.addThread(new TestingThread(ctx) {
+ @Override
+ public void doWork() throws Exception {
+ throw new IOException("my ioe");
+ }
+ });
+ ctx.startThreads();
+ long st = System.currentTimeMillis();
+ try {
+ ctx.waitFor(30000);
+ fail("waitFor did not throw");
+ } catch (RuntimeException rte) {
+ // expected
+ assertEquals("my ioe", rte.getCause().getMessage());
+ }
+ long et = System.currentTimeMillis();
+ // Test shouldn't have waited the full 30 seconds, since
+ // the thread throws faster than that
+ assertTrue("Test took " + (et - st) + "ms",
+ et - st < 5000);
+ }
+
+ @Test
+ public void testRepeatingThread() throws Exception {
+ final AtomicInteger counter = new AtomicInteger();
+
+ TestContext ctx = new TestContext();
+ ctx.addThread(new RepeatingTestThread(ctx) {
+ @Override
+ public void doAnAction() throws Exception {
+ counter.incrementAndGet();
+ }
+ });
+ ctx.startThreads();
+ long st = System.currentTimeMillis();
+ ctx.waitFor(3000);
+ ctx.stop();
+ long et = System.currentTimeMillis();
+ long elapsed = et - st;
+
+ // Test should have waited just about 3 seconds
+ assertTrue("Test took " + (et - st) + "ms",
+ Math.abs(elapsed - 3000) < 500);
+ // Counter should have been incremented lots of times in 3 full seconds
+ assertTrue("Counter value = " + counter.get(),
+ counter.get() > 1000);
+ }
+
+}