STORM-756 Add unit tests of ShellBoltMessageQueue
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/117e8186 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/117e8186 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/117e8186 Branch: refs/heads/master Commit: 117e81861485ab02d4b97de725309132fce3f2a1 Parents: 56dc7b9 Author: Jungtaek Lim <[email protected]> Authored: Tue Nov 24 19:14:05 2015 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Tue Nov 24 19:14:05 2015 +0900 ---------------------------------------------------------------------- .../storm/utils/ShellBoltMessageQueueTest.java | 67 ++++++++++++++++++++ 1 file changed, 67 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/117e8186/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java new file mode 100644 index 0000000..1dcbb03 --- /dev/null +++ b/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java @@ -0,0 +1,67 @@ +package backtype.storm.utils; + +import backtype.storm.multilang.BoltMsg; +import com.google.common.collect.Lists; +import junit.framework.TestCase; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class ShellBoltMessageQueueTest extends TestCase { + @Test + public void testPollTaskIdsFirst() throws InterruptedException { + ShellBoltMessageQueue queue = new ShellBoltMessageQueue(); + + // put bolt message first, then put task ids + queue.putBoltMsg(new BoltMsg()); + ArrayList<Integer> taskIds = Lists.newArrayList(1, 2, 3); + queue.putTaskIds(taskIds); + + Object msg = queue.poll(10, TimeUnit.SECONDS); + + // task ids should be pulled first + assertTrue(msg instanceof List<?>); + assertEquals(msg, taskIds); + } + + @Test + public void testPollWhileThereAreNoDataAvailable() throws InterruptedException { + ShellBoltMessageQueue queue = new ShellBoltMessageQueue(); + + long start = System.currentTimeMillis(); + Object msg = queue.poll(1, TimeUnit.SECONDS); + long finish = System.currentTimeMillis(); + + assertNull(msg); + assertTrue(finish - start > 1000); + } + + @Test + public void testPollShouldReturnASAPWhenDataAvailable() throws InterruptedException { + final ShellBoltMessageQueue queue = new ShellBoltMessageQueue(); + final List<Integer> taskIds = Lists.newArrayList(1, 2, 3); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // NOOP + } + + queue.putTaskIds(taskIds); + } + }); + t.start(); + + long start = System.currentTimeMillis(); + Object msg = queue.poll(10, TimeUnit.SECONDS); + long finish = System.currentTimeMillis(); + + assertEquals(msg, taskIds); + assertTrue(finish - start < (10 * 1000)); + } +}
