Repository: incubator-streams Updated Branches: refs/heads/master a7a40125d -> a973ba217
Added ThroughputQueue and Tests Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/95033095 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/95033095 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/95033095 Branch: refs/heads/master Commit: 950330952a4aab51e7f273c27e8dad2eafeed0b5 Parents: c483941 Author: Ryan Ebanks <[email protected]> Authored: Mon Oct 6 15:38:37 2014 -0500 Committer: Ryan Ebanks <[email protected]> Committed: Mon Oct 6 15:38:37 2014 -0500 ---------------------------------------------------------------------- .../streams/local/queues/ThroughputQueue.java | 382 +++++++++++++++++++ .../local/queues/ThroughputQueueMXBean.java | 48 +++ .../queues/ThroughputQueueMulitThreadTest.java | 285 ++++++++++++++ .../queues/ThroughputQueueSingleThreadTest.java | 159 ++++++++ 4 files changed, 874 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/95033095/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java new file mode 100644 index 0000000..71f819d --- /dev/null +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java @@ -0,0 +1,382 @@ +package org.apache.streams.local.queues; + +import net.jcip.annotations.GuardedBy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import javax.management.*; +import java.lang.management.ManagementFactory; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * A {@link java.util.concurrent.BlockingQueue} implementation that allows the measure measurement of how + * data flows through the queue. Is also a <code>MBean</code> so the flow statistics can be viewed through + * JMX. Registration of the bean happens whenever a constructor receives a non-null id. + * + * !!! Warning !!! + * Only the necessary methods for the local streams runtime are implemented. All other methods throw a + * {@link sun.reflect.generics.reflectiveObjects.NotImplementedException}. + */ +public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBean { + + public static final String NAME_TEMPLATE = "org.apache.streams.local:type=ThroughputQueue,name=%s"; + + private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueue.class); + + private BlockingQueue<ThroughputElement<E>> underlyingQueue; + private ReadWriteLock putCountsLock; + private ReadWriteLock takeCountsLock; + @GuardedBy("putCountsLock") + private long elementsAdded; + @GuardedBy("takeCountsLock") + private long elementsRemoved; + @GuardedBy("this") + private long startTime; + @GuardedBy("takeCountsLock") + private long totalQueueTime; + @GuardedBy("takeCountsLock") + private long maxQueuedTime; + private volatile boolean active; + + /** + * Creates an unbounded, unregistered <code>ThroughputQueue</code> + */ + public ThroughputQueue() { + this(-1, null); + } + + /** + * Creates a bounded, unregistered <code>ThroughputQueue</code> + * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded + */ + public ThroughputQueue(int maxSize) { + this(maxSize, null); + } + + /** + * Creates an unbounded, registered <code>ThroughputQueue</code> + * @param id unique id for this queue to be registered with. if id == NULL then not registered + */ + public ThroughputQueue(String id) { + this(-1, id); + } + + /** + * Creates a bounded, registered <code>ThroughputQueue</code> + * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded + * @param id unique id for this queue to be registered with. if id == NULL then not registered + */ + public ThroughputQueue(int maxSize, String id) { + if(maxSize < 1) { + this.underlyingQueue = new LinkedBlockingQueue<>(); + } else { + this.underlyingQueue = new LinkedBlockingQueue<>(maxSize); + } + this.elementsAdded = 0; + this.elementsRemoved = 0; + this.startTime = -1; + this.putCountsLock = new ReentrantReadWriteLock(); + this.takeCountsLock = new ReentrantReadWriteLock(); + this.active = false; + this.maxQueuedTime = -1; + if(id != null) { + try { + ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id)); + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + mbs.registerMBean(this, name); + } catch (MalformedObjectNameException|InstanceAlreadyExistsException|MBeanRegistrationException|NotCompliantMBeanException e) { + LOGGER.error("Failed to register MXBean : {}", e); + throw new RuntimeException(e); + } + } + } + + @Override + public boolean add(E e) { + throw new NotImplementedException(); + } + + @Override + public boolean offer(E e) { + throw new NotImplementedException(); + } + + @Override + public void put(E e) throws InterruptedException { + this.underlyingQueue.put(new ThroughputElement<E>(e)); + try { + this.putCountsLock.writeLock().lockInterruptibly(); + ++this.elementsAdded; + } finally { + this.putCountsLock.writeLock().unlock(); + } + synchronized (this) { + if (!this.active) { + this.startTime = System.currentTimeMillis(); + this.active = true; + } + } + + } + + @Override + public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { + if(this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) { + try { + this.putCountsLock.writeLock().lockInterruptibly(); + ++this.elementsAdded; + } finally { + this.putCountsLock.writeLock().unlock(); + } + synchronized (this) { + if (!this.active) { + this.startTime = System.currentTimeMillis(); + this.active = true; + } + } + return true; + } + return false; + } + + @Override + public E take() throws InterruptedException { + ThroughputElement<E> e = this.underlyingQueue.take(); + try { + this.takeCountsLock.writeLock().lockInterruptibly(); + ++this.elementsRemoved; + Long queueTime = e.getWaited(); + this.totalQueueTime += queueTime; + if(this.maxQueuedTime < queueTime) { + this.maxQueuedTime = queueTime; + } + } finally { + this.takeCountsLock.writeLock().unlock(); + } + return e.getElement(); + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + throw new NotImplementedException(); + } + + @Override + public int remainingCapacity() { + throw new NotImplementedException(); + } + + @Override + public boolean remove(Object o) { + throw new NotImplementedException(); + } + + @Override + public boolean contains(Object o) { + throw new NotImplementedException(); + } + + @Override + public int drainTo(Collection<? super E> c) { + throw new NotImplementedException(); + } + + @Override + public int drainTo(Collection<? super E> c, int maxElements) { + throw new NotImplementedException(); + } + + @Override + public E remove() { + throw new NotImplementedException(); + } + + @Override + public E poll() { + throw new NotImplementedException(); + } + + @Override + public E element() { + throw new NotImplementedException(); + } + + @Override + public E peek() { + throw new NotImplementedException(); + } + + @Override + public int size() { + return this.underlyingQueue.size(); + } + + @Override + public boolean isEmpty() { + return this.underlyingQueue.isEmpty(); + } + + @Override + public Iterator<E> iterator() { + throw new NotImplementedException(); + } + + @Override + public Object[] toArray() { + throw new NotImplementedException(); + } + + @Override + public <T> T[] toArray(T[] a) { + throw new NotImplementedException(); + } + + @Override + public boolean containsAll(Collection<?> c) { + throw new NotImplementedException(); + } + + @Override + public boolean addAll(Collection<? extends E> c) { + throw new NotImplementedException(); + } + + @Override + public boolean removeAll(Collection<?> c) { + throw new NotImplementedException(); + } + + @Override + public boolean retainAll(Collection<?> c) { + throw new NotImplementedException(); + } + + @Override + public void clear() { + throw new NotImplementedException(); + } + + @Override + public long getCurrentSize() { + long size = -1; + try { + this.putCountsLock.readLock().lock(); + try { + this.takeCountsLock.readLock().lock(); + size = this.elementsAdded - this.elementsRemoved; + } finally { + this.takeCountsLock.readLock().unlock(); + } + } finally { + this.putCountsLock.readLock().unlock(); + } + return size; + } + + @Override + public double getAvgWait() { + double avg = -1.0; + try { + this.takeCountsLock.readLock().lock(); + avg = (double) this.totalQueueTime / (double) this.elementsRemoved; + } finally { + this.takeCountsLock.readLock().unlock(); + } + return avg; + } + + @Override + public long getMaxWait() { + ThroughputElement<E> e = this.underlyingQueue.peek(); + long max = -1; + try { + this.takeCountsLock.readLock().lock(); + if (e != null && e.getWaited() > this.maxQueuedTime) { + max = e.getWaited(); + } else { + max = this.maxQueuedTime; + } + } finally { + this.takeCountsLock.readLock().unlock(); + } + return max; + } + + @Override + public long getRemoved() { + long num = -1; + try { + this.takeCountsLock.readLock().lock(); + num = this.elementsRemoved; + } finally { + this.takeCountsLock.readLock().unlock(); + } + return num; + } + + @Override + public long getAdded() { + long num = -1; + try { + this.putCountsLock.readLock().lock(); + num = this.elementsAdded; + } finally { + this.putCountsLock.readLock().unlock(); + } + return num; + } + + @Override + public double getThroughput() { + double tp = -1.0; + synchronized (this) { + try { + this.takeCountsLock.readLock().lock(); + tp = this.elementsRemoved / ((System.currentTimeMillis() - this.startTime) / 1000.0); + } finally { + this.takeCountsLock.readLock().unlock(); + } + } + return tp; + } + + + /** + * Element wrapper to measure time waiting on the queue + * @param <E> + */ + private class ThroughputElement<E> { + + private long queuedTime; + private E element; + + protected ThroughputElement(E element) { + this.element = element; + this.queuedTime = System.currentTimeMillis(); + } + + /** + * Get the time this element has been waiting on the queue. + * current time - time element was queued + * @return time this element has been waiting on the queue in milliseconds + */ + public long getWaited() { + return System.currentTimeMillis() - this.queuedTime; + } + + /** + * Get the queued element + * @return the element + */ + public E getElement() { + return this.element; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/95033095/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java new file mode 100644 index 0000000..00d3a47 --- /dev/null +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java @@ -0,0 +1,48 @@ +package org.apache.streams.local.queues; + +import javax.management.MXBean; + +/** + * MXBean capable queue that monitors the throughput of the queue + */ +public interface ThroughputQueueMXBean { + + /** + * Returns the number of items on the queue. + * @return number of items on queue + */ + public long getCurrentSize(); + + /** + * Get the average time an item spends in queue in milliseconds + * @return average time an item spends in queue in milliseconds + */ + public double getAvgWait(); + + /** + * Get the maximum time an item has spent on the queue before being removed from the queue. + * @return the maximum time an item has spent on the queue + */ + public long getMaxWait(); + + /** + * Get the number of items that have been removed from this queue + * @return number of items that have been removed from the queue + */ + public long getRemoved(); + + /** + * Get the number of items that have been added to the queue + * @return number of items that have been added to the queue + */ + public long getAdded(); + + /** + * Get the the throughput of the queue measured by the time the queue has been active divided by + * the number of items removed from the queue. Active time starts once the first item has been pl + * @return throughput of queue. items/sec, items removed / time active + */ + public double getThroughput(); + + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/95033095/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java new file mode 100644 index 0000000..f4a0156 --- /dev/null +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java @@ -0,0 +1,285 @@ +package org.apache.streams.local.queues; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.InstanceNotFoundException; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; +import java.util.concurrent.*; + +/** + * MultiThread unit tests for {@link org.apache.streams.local.queues.ThroughputQueue} + */ +public class ThroughputQueueMulitThreadTest extends RandomizedTest { + + private final static Logger LOGGER = LoggerFactory.getLogger(ThroughputQueueMulitThreadTest.class); + private static final String MBEAN_ID = "testQueue"; + + /** + * Remove registered mbeans from previous tests + * @throws Exception + */ + @After + public void unregisterMXBean() throws Exception { + try { + ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID))); + } catch (InstanceNotFoundException ife) { + //No-op + } + } + + + /** + * Test that queue will block on puts when the queue is full + * @throws InterruptedException + */ + @Test + public void testBlockOnFullQueue() throws InterruptedException { + int queueSize = randomIntBetween(1, 3000); + ExecutorService executor = Executors.newSingleThreadExecutor(); + CountDownLatch full = new CountDownLatch(1); + CountDownLatch finished = new CountDownLatch(1); + ThroughputQueue queue = new ThroughputQueue(queueSize); + BlocksOnFullQueue testThread = new BlocksOnFullQueue(full, finished, queue, queueSize); + executor.submit(testThread); + full.await(); + assertEquals(queueSize, queue.size()); + assertEquals(queueSize, queue.getCurrentSize()); + assertFalse(testThread.isComplete()); //test that it is blocked + safeSleep(1000); + assertFalse(testThread.isComplete()); //still blocked + queue.take(); + finished.await(); + assertEquals(queueSize, queue.size()); + assertEquals(queueSize, queue.getCurrentSize()); + assertTrue(testThread.isComplete()); + executor.shutdownNow(); + executor.awaitTermination(500, TimeUnit.MILLISECONDS); + } + + /** + * Test that queue will block on Take when queue is empty + * @throws InterruptedException + */ + @Test + public void testBlockOnEmptyQueue() throws InterruptedException { + int queueSize = randomIntBetween(1, 3000); + ExecutorService executor = Executors.newSingleThreadExecutor(); + CountDownLatch empty = new CountDownLatch(1); + CountDownLatch finished = new CountDownLatch(1); + ThroughputQueue queue = new ThroughputQueue(); + BlocksOnEmptyQueue testThread = new BlocksOnEmptyQueue(empty, finished, queueSize, queue); + for(int i=0; i < queueSize; ++i) { + queue.put(i); + } + executor.submit(testThread); + empty.await(); + assertEquals(0, queue.size()); + assertEquals(0, queue.getCurrentSize()); + assertFalse(testThread.isComplete()); + safeSleep(1000); + assertFalse(testThread.isComplete()); + queue.put(1); + finished.await(); + assertEquals(0, queue.size()); + assertEquals(0, queue.getCurrentSize()); + assertTrue(testThread.isComplete()); + executor.shutdownNow(); + executor.awaitTermination(500, TimeUnit.MILLISECONDS); + } + + + /** + * Test multiple threads putting and taking from the queue while + * this thread repeatedly calls the MXBean measurement methods. + * Should hammer the queue with request from multiple threads + * of all request types. Purpose is to expose current modification exceptions + * and/or dead locks. + */ + @Test + @Repeat(iterations = 3) + public void testMultiThreadAccessAndInteruptResponse() throws Exception { + int putTakeThreadCount = randomIntBetween(1, 10); + int dataCount = randomIntBetween(1, 2000000); + int pollCount = randomIntBetween(1, 2000000); + int maxSize = randomIntBetween(1, 1000); + CountDownLatch finished = new CountDownLatch(putTakeThreadCount); + ThroughputQueue queue = new ThroughputQueue(maxSize, MBEAN_ID); + ExecutorService executor = Executors.newFixedThreadPool(putTakeThreadCount * 2); + for(int i=0; i < putTakeThreadCount; ++i) { + executor.submit(new PutData(finished, queue, dataCount)); + executor.submit(new TakeData(queue)); + } + for(int i=0; i < pollCount; ++i) { + queue.getAvgWait(); + queue.getAdded(); + queue.getCurrentSize(); + queue.getMaxWait(); + queue.getRemoved(); + queue.getThroughput(); + } + finished.await(); + while(!queue.isEmpty()) { + LOGGER.info("Waiting for queue to be emptied..."); + safeSleep(500); + } + long totalData = ((long) dataCount) * putTakeThreadCount; + assertEquals(totalData, queue.getAdded()); + assertEquals(totalData, queue.getRemoved()); + executor.shutdown(); + executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown puts + executor.shutdownNow(); + executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown takes + //Randomized should not report thread leak + } + + + + private void safeSleep(long sleep) { + try { + Thread.sleep(sleep); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + + + + /** + * Helper runnable for test {@link ThroughputQueueMulitThreadTest#testBlockOnFullQueue()} + */ + private class BlocksOnFullQueue implements Runnable { + + private CountDownLatch full; + volatile private boolean complete; + private int queueSize; + private CountDownLatch finished; + private BlockingQueue queue; + + public BlocksOnFullQueue(CountDownLatch latch, CountDownLatch finished, BlockingQueue queue, int queueSize) { + this.full = latch; + this.complete = false; + this.queueSize = queueSize; + this.finished = finished; + this.queue = queue; + } + + @Override + public void run() { + try { + for (int i = 0; i < this.queueSize; ++i) { + this.queue.put(i); + } + this.full.countDown(); + this.queue.put(0); + this.complete = true; + this.finished.countDown(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + public boolean isComplete() { + return this.complete; + } + } + + + /** + * Helper runnable class for test {@link ThroughputQueueMulitThreadTest#testBlockOnEmptyQueue()} + */ + private class BlocksOnEmptyQueue implements Runnable { + + private CountDownLatch full; + volatile private boolean complete; + private int queueSize; + private CountDownLatch finished; + private BlockingQueue queue; + + public BlocksOnEmptyQueue(CountDownLatch full, CountDownLatch finished, int queueSize, BlockingQueue queue) { + this.full = full; + this.finished = finished; + this.queueSize = queueSize; + this.queue = queue; + this.complete = false; + } + + + @Override + public void run() { + try { + for(int i=0; i < this.queueSize; ++i) { + this.queue.take(); + } + this.full.countDown(); + this.queue.take(); + this.complete = true; + this.finished.countDown(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + public boolean isComplete() { + return this.complete; + } + } + + + private class PutData implements Runnable { + + private BlockingQueue queue; + private int dataCount; + private CountDownLatch finished; + + public PutData(CountDownLatch finished, BlockingQueue queue, int dataCount) { + this.queue = queue; + this.dataCount = dataCount; + this.finished = finished; + } + + + @Override + public void run() { + try { + for(int i=0; i < this.dataCount; ++i) { + this.queue.put(i); + } + } catch (InterruptedException ie) { + LOGGER.error("PUT DATA interupted !"); + Thread.currentThread().interrupt(); + } + this.finished.countDown(); + } + } + + + private class TakeData implements Runnable { + + private BlockingQueue queue; + + public TakeData(BlockingQueue queue) { + this.queue = queue; + } + + + @Override + public void run() { + try { + while(true) { + this.queue.take(); + } + } catch (InterruptedException ie) { + LOGGER.error("PUT DATA interupted !"); + Thread.currentThread().interrupt(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/95033095/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java new file mode 100644 index 0000000..496837a --- /dev/null +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java @@ -0,0 +1,159 @@ +package org.apache.streams.local.queues; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import org.junit.Test; + +import javax.management.MBeanServer; +import javax.management.ObjectInstance; +import javax.management.ObjectName; + +import java.lang.management.ManagementFactory; + +import static org.junit.Assert.assertEquals; + +/** + * Single thread unit tests for {@link org.apache.streams.local.queues.ThroughputQueue} + */ +public class ThroughputQueueSingleThreadTest extends RandomizedTest { + + + /** + * Test that take and put queue and dequeue data as expected and all + * measurements form the queue are returning data. + * @throws Exception + */ + @Test + @Repeat(iterations = 3) + public void testTakeAndPut() throws Exception { + ThroughputQueue<Integer> queue = new ThroughputQueue<>(); + int putCount = randomIntBetween(1, 1000); + for(int i=0; i < putCount; ++i) { + queue.put(i); + assertEquals(i+1, queue.size()); + assertEquals(queue.size(), queue.getCurrentSize()); + } + int takeCount = randomIntBetween(1, putCount); + for(int i=0; i < takeCount; ++i) { + Integer element = queue.take(); + assertNotNull(element); + assertEquals(i, element.intValue()); + assertEquals(putCount - (1+i), queue.size()); + assertEquals(queue.size(), queue.getCurrentSize()); + } + assertEquals(putCount-takeCount, queue.size()); + assertEquals(queue.size(), queue.getCurrentSize()); + assertTrue(0 < queue.getMaxWait()); + assertTrue(0 < queue.getAvgWait()); + assertTrue(0 < queue.getThroughput()); + assertEquals(putCount, queue.getAdded()); + assertEquals(takeCount, queue.getRemoved()); + } + + /** + * Test that max wait and avg wait return expected values + * @throws Exception + */ + @Test + public void testWait() throws Exception { + ThroughputQueue queue = new ThroughputQueue(); + int wait = 1000; + + for(int i=0; i < 3; ++i) { + queue.put(1); + safeSleep(wait); + queue.take(); + assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 1.2));//can't calculate exactly, making sure its close. + assertTrue(queue.getAvgWait() >= wait && queue.getAvgWait() <= (wait * 1.2)); + } + queue.put(1); + queue.take(); + assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 1.2));//can't calculate exactly, making sure its close. + assertTrue(queue.getAvgWait() <= 1000 ); + assertTrue(queue.getAvgWait() >= 750); + } + + /** + * Test that throughput returns expected values. + * @throws Exception + */ + @Test + public void testThroughput() throws Exception { + ThroughputQueue queue = new ThroughputQueue(); + int wait = 100; + for(int i=0; i < 10; ++i) { + queue.put(1); + safeSleep(wait); + queue.take(); + } + double throughput = queue.getThroughput(); + assertTrue(throughput <= 10 ); //can't calculate exactly, making sure its close. + assertTrue(throughput >= 9.5); + + queue = new ThroughputQueue(); + wait = 1000; + for(int i=0; i < 10; ++i) { + queue.put(1); + } + for(int i=0; i < 10; ++i) { + queue.take(); + } + safeSleep(wait); + throughput = queue.getThroughput(); + assertTrue(throughput <= 10 ); //can't calculate exactly, making sure its close. + assertTrue(throughput >= 9.5); + } + + + /** + * Test that the mbean registers + */ + @Test + public void testMBeanRegistration() { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + Integer beanCount = mbs.getMBeanCount(); + String id = "testQueue"; + ThroughputQueue queue = new ThroughputQueue(id); + assertEquals("Expected bean to be registered", new Integer(beanCount+1), mbs.getMBeanCount()); + ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id))); + assertNotNull(mBean); + } catch (Exception e) { + fail("Failed to register MXBean : "+e.getMessage()); + } + } + + /** + * Test that mulitple mbeans of the same type with a different name can be registered + */ + @Test + public void testMultipleMBeanRegistrations() { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + Integer beanCount = mbs.getMBeanCount(); + String id = "testQueue"; + int numReg = randomIntBetween(2, 100); + for(int i=0; i < numReg; ++i) { + ThroughputQueue queue = new ThroughputQueue(id+i); + assertEquals("Expected bean to be registered", new Integer(beanCount + (i+1)), mbs.getMBeanCount()); + ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id+i))); + assertNotNull(mBean); + } + } catch (Exception e) { + fail("Failed to register MXBean : "+e.getMessage()); + } + } + + + private void safeSleep(long sleep) { + try { + Thread.sleep(sleep); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + + + +}
