Repository: incubator-streams Updated Branches: refs/heads/master 48d54c290 -> b273496c9
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java new file mode 100644 index 0000000..60df89c --- /dev/null +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.streams.local.queues; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import org.apache.streams.util.ComponentUtils; +import org.joda.time.DateTime; +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.InstanceNotFoundException; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; +import java.util.concurrent.*; + +/** + * MultiThread unit tests for {@link org.apache.streams.local.queues.ThroughputQueue} + */ +public class ThroughputQueueMultiThreadTest extends RandomizedTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueueMultiThreadTest.class); + private static final String MBEAN_ID = "testQueue"; + private static final String STREAM_ID = "test_stream"; + private static long STREAM_START_TIME = (new DateTime()).getMillis(); + + /** + * Remove registered mbeans from previous tests + * @throws Exception + */ + @After + public void unregisterMXBean() throws Exception { + try { + ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME))); + } catch (InstanceNotFoundException ife) { + //No-op + } + } + + @After + public void removeLocalMBeans() { + try { + ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); + } catch (Exception e) { + //No op. proceed to next test + } + } + + + /** + * Test that queue will block on puts when the queue is full + * @throws InterruptedException + */ + @Test + public void testBlockOnFullQueue() throws InterruptedException { + int queueSize = randomIntBetween(1, 3000); + ExecutorService executor = Executors.newSingleThreadExecutor(); + CountDownLatch full = new CountDownLatch(1); + CountDownLatch finished = new CountDownLatch(1); + ThroughputQueue queue = new ThroughputQueue(queueSize); + BlocksOnFullQueue testThread = new BlocksOnFullQueue(full, finished, queue, queueSize); + executor.submit(testThread); + full.await(); + assertEquals(queueSize, queue.size()); + assertEquals(queueSize, queue.getCurrentSize()); + assertFalse(testThread.isComplete()); //test that it is blocked + safeSleep(1000); + assertFalse(testThread.isComplete()); //still blocked + queue.take(); + finished.await(); + assertEquals(queueSize, queue.size()); + assertEquals(queueSize, queue.getCurrentSize()); + assertTrue(testThread.isComplete()); + executor.shutdownNow(); + executor.awaitTermination(500, TimeUnit.MILLISECONDS); + } + + /** + * Test that queue will block on Take when queue is empty + * @throws InterruptedException + */ + @Test + public void testBlockOnEmptyQueue() throws InterruptedException { + int queueSize = randomIntBetween(1, 3000); + ExecutorService executor = Executors.newSingleThreadExecutor(); + CountDownLatch empty = new CountDownLatch(1); + CountDownLatch finished = new CountDownLatch(1); + ThroughputQueue queue = new ThroughputQueue(); + BlocksOnEmptyQueue testThread = new BlocksOnEmptyQueue(empty, finished, queueSize, queue); + for(int i=0; i < queueSize; ++i) { + queue.put(i); + } + executor.submit(testThread); + empty.await(); + assertEquals(0, queue.size()); + assertEquals(0, queue.getCurrentSize()); + assertFalse(testThread.isComplete()); + safeSleep(1000); + assertFalse(testThread.isComplete()); + queue.put(1); + finished.await(); + assertEquals(0, queue.size()); + assertEquals(0, queue.getCurrentSize()); + assertTrue(testThread.isComplete()); + executor.shutdownNow(); + executor.awaitTermination(500, TimeUnit.MILLISECONDS); + } + + + /** + * Test multiple threads putting and taking from the queue while + * this thread repeatedly calls the MXBean measurement methods. + * Should hammer the queue with request from multiple threads + * of all request types. Purpose is to expose current modification exceptions + * and/or dead locks. + */ + @Test + @Repeat(iterations = 3) + public void testMultiThreadAccessAndInteruptResponse() throws Exception { + int putTakeThreadCount = randomIntBetween(1, 10); + int dataCount = randomIntBetween(1, 2000000); + int pollCount = randomIntBetween(1, 2000000); + int maxSize = randomIntBetween(1, 1000); + CountDownLatch finished = new CountDownLatch(putTakeThreadCount); + ThroughputQueue queue = new ThroughputQueue(maxSize, MBEAN_ID); + ExecutorService executor = Executors.newFixedThreadPool(putTakeThreadCount * 2); + for(int i=0; i < putTakeThreadCount; ++i) { + executor.submit(new PutData(finished, queue, dataCount)); + executor.submit(new TakeData(queue)); + } + for(int i=0; i < pollCount; ++i) { + queue.getAvgWait(); + queue.getAdded(); + queue.getCurrentSize(); + queue.getMaxWait(); + queue.getRemoved(); + queue.getThroughput(); + } + finished.await(); + while(!queue.isEmpty()) { + LOGGER.info("Waiting for queue to be emptied..."); + safeSleep(500); + } + long totalData = ((long) dataCount) * putTakeThreadCount; + assertEquals(totalData, queue.getAdded()); + assertEquals(totalData, queue.getRemoved()); + executor.shutdown(); + executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown puts + executor.shutdownNow(); + executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown takes + //Randomized should not report thread leak + } + + + + private void safeSleep(long sleep) { + try { + Thread.sleep(sleep); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + + + + /** + * Helper runnable for test {@link ThroughputQueueMultiThreadTest#testBlockOnFullQueue()} + */ + private class BlocksOnFullQueue implements Runnable { + + private CountDownLatch full; + volatile private boolean complete; + private int queueSize; + private CountDownLatch finished; + private BlockingQueue queue; + + public BlocksOnFullQueue(CountDownLatch latch, CountDownLatch finished, BlockingQueue queue, int queueSize) { + this.full = latch; + this.complete = false; + this.queueSize = queueSize; + this.finished = finished; + this.queue = queue; + } + + @Override + public void run() { + try { + for (int i = 0; i < this.queueSize; ++i) { + this.queue.put(i); + } + this.full.countDown(); + this.queue.put(0); + this.complete = true; + this.finished.countDown(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + public boolean isComplete() { + return this.complete; + } + } + + + /** + * Helper runnable class for test {@link ThroughputQueueMultiThreadTest#testBlockOnEmptyQueue()} + */ + private class BlocksOnEmptyQueue implements Runnable { + + private CountDownLatch full; + volatile private boolean complete; + private int queueSize; + private CountDownLatch finished; + private BlockingQueue queue; + + public BlocksOnEmptyQueue(CountDownLatch full, CountDownLatch finished, int queueSize, BlockingQueue queue) { + this.full = full; + this.finished = finished; + this.queueSize = queueSize; + this.queue = queue; + this.complete = false; + } + + + @Override + public void run() { + try { + for(int i=0; i < this.queueSize; ++i) { + this.queue.take(); + } + this.full.countDown(); + this.queue.take(); + this.complete = true; + this.finished.countDown(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + public boolean isComplete() { + return this.complete; + } + } + + + private class PutData implements Runnable { + + private BlockingQueue queue; + private int dataCount; + private CountDownLatch finished; + + public PutData(CountDownLatch finished, BlockingQueue queue, int dataCount) { + this.queue = queue; + this.dataCount = dataCount; + this.finished = finished; + } + + + @Override + public void run() { + try { + for(int i=0; i < this.dataCount; ++i) { + this.queue.put(i); + } + } catch (InterruptedException ie) { + LOGGER.error("PUT DATA interupted !"); + Thread.currentThread().interrupt(); + } + this.finished.countDown(); + } + } + + + private class TakeData implements Runnable { + + private BlockingQueue queue; + + public TakeData(BlockingQueue queue) { + this.queue = queue; + } + + + @Override + public void run() { + try { + while(true) { + this.queue.take(); + } + } catch (InterruptedException ie) { + LOGGER.error("PUT DATA interupted !"); + Thread.currentThread().interrupt(); + } + } + } + +}
