Repository: incubator-streams
Updated Branches:
  refs/heads/master a7a40125d -> a973ba217


Added ThroughputQueue and Tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/95033095
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/95033095
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/95033095

Branch: refs/heads/master
Commit: 950330952a4aab51e7f273c27e8dad2eafeed0b5
Parents: c483941
Author: Ryan Ebanks <[email protected]>
Authored: Mon Oct 6 15:38:37 2014 -0500
Committer: Ryan Ebanks <[email protected]>
Committed: Mon Oct 6 15:38:37 2014 -0500

----------------------------------------------------------------------
 .../streams/local/queues/ThroughputQueue.java   | 382 +++++++++++++++++++
 .../local/queues/ThroughputQueueMXBean.java     |  48 +++
 .../queues/ThroughputQueueMulitThreadTest.java  | 285 ++++++++++++++
 .../queues/ThroughputQueueSingleThreadTest.java | 159 ++++++++
 4 files changed, 874 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/95033095/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
new file mode 100644
index 0000000..71f819d
--- /dev/null
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -0,0 +1,382 @@
+package org.apache.streams.local.queues;
+
+import net.jcip.annotations.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import javax.management.*;
+import java.lang.management.ManagementFactory;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A {@link java.util.concurrent.BlockingQueue} implementation that allows the 
measure measurement of how
+ * data flows through the queue.  Is also a <code>MBean</code> so the flow 
statistics can be viewed through
+ * JMX. Registration of the bean happens whenever a constructor receives a 
non-null id.
+ *
+ * !!! Warning !!!
+ * Only the necessary methods for the local streams runtime are implemented.  
All other methods throw a
+ * {@link sun.reflect.generics.reflectiveObjects.NotImplementedException}.
+ */
+public class ThroughputQueue<E> implements BlockingQueue<E>, 
ThroughputQueueMXBean {
+
+    public static final String NAME_TEMPLATE = 
"org.apache.streams.local:type=ThroughputQueue,name=%s";
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ThroughputQueue.class);
+
+    private BlockingQueue<ThroughputElement<E>> underlyingQueue;
+    private ReadWriteLock putCountsLock;
+    private ReadWriteLock takeCountsLock;
+    @GuardedBy("putCountsLock")
+    private long elementsAdded;
+    @GuardedBy("takeCountsLock")
+    private long elementsRemoved;
+    @GuardedBy("this")
+    private long startTime;
+    @GuardedBy("takeCountsLock")
+    private long totalQueueTime;
+    @GuardedBy("takeCountsLock")
+    private long maxQueuedTime;
+    private volatile boolean active;
+
+    /**
+     * Creates an unbounded, unregistered <code>ThroughputQueue</code>
+     */
+    public ThroughputQueue() {
+        this(-1, null);
+    }
+
+    /**
+     * Creates a bounded, unregistered <code>ThroughputQueue</code>
+     * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
+     */
+    public ThroughputQueue(int maxSize) {
+        this(maxSize, null);
+    }
+
+    /**
+     * Creates an unbounded, registered <code>ThroughputQueue</code>
+     * @param id unique id for this queue to be registered with. if id == NULL 
then not registered
+     */
+    public ThroughputQueue(String id) {
+        this(-1, id);
+    }
+
+    /**
+     * Creates a bounded, registered <code>ThroughputQueue</code>
+     * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
+     * @param id unique id for this queue to be registered with. if id == NULL 
then not registered
+     */
+    public ThroughputQueue(int maxSize, String id) {
+        if(maxSize < 1) {
+            this.underlyingQueue = new LinkedBlockingQueue<>();
+        } else {
+            this.underlyingQueue = new LinkedBlockingQueue<>(maxSize);
+        }
+        this.elementsAdded = 0;
+        this.elementsRemoved = 0;
+        this.startTime = -1;
+        this.putCountsLock = new ReentrantReadWriteLock();
+        this.takeCountsLock = new ReentrantReadWriteLock();
+        this.active = false;
+        this.maxQueuedTime = -1;
+        if(id != null) {
+            try {
+                ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, 
id));
+                MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+                mbs.registerMBean(this, name);
+            } catch 
(MalformedObjectNameException|InstanceAlreadyExistsException|MBeanRegistrationException|NotCompliantMBeanException
 e) {
+                LOGGER.error("Failed to register MXBean : {}", e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    public boolean add(E e) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean offer(E e) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public void put(E e) throws InterruptedException {
+        this.underlyingQueue.put(new ThroughputElement<E>(e));
+        try {
+            this.putCountsLock.writeLock().lockInterruptibly();
+            ++this.elementsAdded;
+        } finally {
+            this.putCountsLock.writeLock().unlock();
+        }
+        synchronized (this) {
+            if (!this.active) {
+                this.startTime = System.currentTimeMillis();
+                this.active = true;
+            }
+        }
+
+    }
+
+    @Override
+    public boolean offer(E e, long timeout, TimeUnit unit) throws 
InterruptedException {
+        if(this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, 
unit)) {
+            try {
+                this.putCountsLock.writeLock().lockInterruptibly();
+                ++this.elementsAdded;
+            } finally {
+                this.putCountsLock.writeLock().unlock();
+            }
+            synchronized (this) {
+                if (!this.active) {
+                    this.startTime = System.currentTimeMillis();
+                    this.active = true;
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public E take() throws InterruptedException {
+        ThroughputElement<E> e = this.underlyingQueue.take();
+        try {
+            this.takeCountsLock.writeLock().lockInterruptibly();
+            ++this.elementsRemoved;
+            Long queueTime = e.getWaited();
+            this.totalQueueTime += queueTime;
+            if(this.maxQueuedTime < queueTime) {
+                this.maxQueuedTime = queueTime;
+            }
+        } finally {
+            this.takeCountsLock.writeLock().unlock();
+        }
+        return e.getElement();
+    }
+
+    @Override
+    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public int remainingCapacity() {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean remove(Object o) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public int drainTo(Collection<? super E> c) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public int drainTo(Collection<? super E> c, int maxElements) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public E remove() {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public E poll() {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public E element() {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public E peek() {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public int size() {
+        return this.underlyingQueue.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return this.underlyingQueue.isEmpty();
+    }
+
+    @Override
+    public Iterator<E> iterator() {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public Object[] toArray() {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends E> c) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> c) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public void clear() {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public long getCurrentSize() {
+        long size = -1;
+        try {
+            this.putCountsLock.readLock().lock();
+            try {
+                this.takeCountsLock.readLock().lock();
+                size = this.elementsAdded - this.elementsRemoved;
+            } finally {
+                this.takeCountsLock.readLock().unlock();
+            }
+        } finally {
+            this.putCountsLock.readLock().unlock();
+        }
+        return size;
+    }
+
+    @Override
+    public double getAvgWait() {
+        double avg = -1.0;
+        try {
+            this.takeCountsLock.readLock().lock();
+            avg = (double) this.totalQueueTime / (double) this.elementsRemoved;
+        } finally {
+            this.takeCountsLock.readLock().unlock();
+        }
+        return avg;
+    }
+
+    @Override
+    public long getMaxWait() {
+        ThroughputElement<E> e = this.underlyingQueue.peek();
+        long max = -1;
+        try {
+            this.takeCountsLock.readLock().lock();
+            if (e != null && e.getWaited() > this.maxQueuedTime) {
+                max = e.getWaited();
+            } else {
+                max = this.maxQueuedTime;
+            }
+        } finally {
+            this.takeCountsLock.readLock().unlock();
+        }
+        return max;
+    }
+
+    @Override
+    public long getRemoved() {
+        long num = -1;
+        try {
+            this.takeCountsLock.readLock().lock();
+            num = this.elementsRemoved;
+        } finally {
+            this.takeCountsLock.readLock().unlock();
+        }
+        return num;
+    }
+
+    @Override
+    public long getAdded() {
+        long num = -1;
+        try {
+            this.putCountsLock.readLock().lock();
+            num = this.elementsAdded;
+        } finally {
+            this.putCountsLock.readLock().unlock();
+        }
+        return num;
+    }
+
+    @Override
+    public double getThroughput() {
+        double tp = -1.0;
+        synchronized (this) {
+            try {
+                this.takeCountsLock.readLock().lock();
+                tp = this.elementsRemoved / ((System.currentTimeMillis() - 
this.startTime) / 1000.0);
+            } finally {
+                this.takeCountsLock.readLock().unlock();
+            }
+        }
+        return tp;
+    }
+
+
+    /**
+     * Element wrapper to measure time waiting on the queue
+     * @param <E>
+     */
+    private class ThroughputElement<E> {
+        
+        private long queuedTime;
+        private E element;
+
+        protected ThroughputElement(E element) {
+            this.element = element;
+            this.queuedTime = System.currentTimeMillis();
+        }
+
+        /**
+         * Get the time this element has been waiting on the queue.
+         * current time - time element was queued
+         * @return time this element has been waiting on the queue in 
milliseconds
+         */
+        public long getWaited() {
+            return System.currentTimeMillis() - this.queuedTime;
+        }
+
+        /**
+         * Get the queued element
+         * @return the element
+         */
+        public E getElement() {
+            return this.element;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/95033095/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
new file mode 100644
index 0000000..00d3a47
--- /dev/null
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
@@ -0,0 +1,48 @@
+package org.apache.streams.local.queues;
+
+import javax.management.MXBean;
+
+/**
+ * MXBean capable queue that monitors the throughput of the queue
+ */
+public interface ThroughputQueueMXBean {
+
+    /**
+     * Returns the number of items on the queue.
+     * @return number of items on queue
+     */ 
+    public long getCurrentSize();
+
+    /**
+     * Get the average time an item spends in queue in milliseconds
+     * @return average time an item spends in queue in milliseconds
+     */
+    public double getAvgWait();
+
+    /**
+     * Get the maximum time an item has spent on the queue before being 
removed from the queue.
+     * @return the maximum time an item has spent on the queue
+     */
+    public long getMaxWait();
+
+    /**
+     * Get the number of items that have been removed from this queue
+     * @return number of items that have been removed from the queue
+     */
+    public long getRemoved();
+
+    /**
+     * Get the number of items that have been added to the queue
+     * @return number of items that have been added to the queue
+     */
+    public long getAdded();
+
+    /**
+     * Get the the throughput of the queue measured by the time the queue has 
been active divided by
+     * the number of items removed from the queue.  Active time starts once 
the first item has been pl
+     * @return throughput of queue. items/sec, items removed / time active
+     */
+    public double getThroughput();
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/95033095/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
new file mode 100644
index 0000000..f4a0156
--- /dev/null
+++ 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
@@ -0,0 +1,285 @@
+package org.apache.streams.local.queues;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.*;
+
+/**
+ * MultiThread unit tests for {@link 
org.apache.streams.local.queues.ThroughputQueue}
+ */
+public class ThroughputQueueMulitThreadTest extends RandomizedTest {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(ThroughputQueueMulitThreadTest.class);
+    private static final String MBEAN_ID = "testQueue";
+
+    /**
+     * Remove registered mbeans from previous tests
+     * @throws Exception
+     */
+    @After
+    public void unregisterMXBean() throws Exception {
+        try {
+            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new 
ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID)));
+        } catch (InstanceNotFoundException ife) {
+            //No-op
+        }
+    }
+
+
+    /**
+     * Test that queue will block on puts when the queue is full
+     * @throws InterruptedException
+     */
+    @Test
+    public void testBlockOnFullQueue() throws InterruptedException {
+        int queueSize = randomIntBetween(1, 3000);
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        CountDownLatch full = new CountDownLatch(1);
+        CountDownLatch finished = new CountDownLatch(1);
+        ThroughputQueue queue = new ThroughputQueue(queueSize);
+        BlocksOnFullQueue testThread = new BlocksOnFullQueue(full, finished, 
queue, queueSize);
+        executor.submit(testThread);
+        full.await();
+        assertEquals(queueSize, queue.size());
+        assertEquals(queueSize, queue.getCurrentSize());
+        assertFalse(testThread.isComplete()); //test that it is blocked
+        safeSleep(1000);
+        assertFalse(testThread.isComplete()); //still blocked
+        queue.take();
+        finished.await();
+        assertEquals(queueSize, queue.size());
+        assertEquals(queueSize, queue.getCurrentSize());
+        assertTrue(testThread.isComplete());
+        executor.shutdownNow();
+        executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Test that queue will block on Take when queue is empty
+     * @throws InterruptedException
+     */
+    @Test
+    public void testBlockOnEmptyQueue() throws InterruptedException {
+        int queueSize = randomIntBetween(1, 3000);
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        CountDownLatch empty = new CountDownLatch(1);
+        CountDownLatch finished = new CountDownLatch(1);
+        ThroughputQueue queue = new ThroughputQueue();
+        BlocksOnEmptyQueue testThread = new BlocksOnEmptyQueue(empty, 
finished, queueSize, queue);
+        for(int i=0; i < queueSize; ++i) {
+            queue.put(i);
+        }
+        executor.submit(testThread);
+        empty.await();
+        assertEquals(0, queue.size());
+        assertEquals(0, queue.getCurrentSize());
+        assertFalse(testThread.isComplete());
+        safeSleep(1000);
+        assertFalse(testThread.isComplete());
+        queue.put(1);
+        finished.await();
+        assertEquals(0, queue.size());
+        assertEquals(0, queue.getCurrentSize());
+        assertTrue(testThread.isComplete());
+        executor.shutdownNow();
+        executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+    }
+
+
+    /**
+     * Test multiple threads putting and taking from the queue while
+     * this thread repeatedly calls the MXBean measurement methods.
+     * Should hammer the queue with request from multiple threads
+     * of all request types.  Purpose is to expose current modification 
exceptions
+     * and/or dead locks.
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testMultiThreadAccessAndInteruptResponse() throws Exception {
+        int putTakeThreadCount = randomIntBetween(1, 10);
+        int dataCount = randomIntBetween(1, 2000000);
+        int pollCount = randomIntBetween(1, 2000000);
+        int maxSize = randomIntBetween(1, 1000);
+        CountDownLatch finished = new CountDownLatch(putTakeThreadCount);
+        ThroughputQueue queue = new ThroughputQueue(maxSize, MBEAN_ID);
+        ExecutorService executor = 
Executors.newFixedThreadPool(putTakeThreadCount * 2);
+        for(int i=0; i < putTakeThreadCount; ++i) {
+            executor.submit(new PutData(finished, queue, dataCount));
+            executor.submit(new TakeData(queue));
+        }
+        for(int i=0; i < pollCount; ++i) {
+            queue.getAvgWait();
+            queue.getAdded();
+            queue.getCurrentSize();
+            queue.getMaxWait();
+            queue.getRemoved();
+            queue.getThroughput();
+        }
+        finished.await();
+        while(!queue.isEmpty()) {
+            LOGGER.info("Waiting for queue to be emptied...");
+            safeSleep(500);
+        }
+        long totalData = ((long) dataCount) * putTakeThreadCount;
+        assertEquals(totalData, queue.getAdded());
+        assertEquals(totalData, queue.getRemoved());
+        executor.shutdown();
+        executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown puts
+        executor.shutdownNow();
+        executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown 
takes
+        //Randomized should not report thread leak
+    }
+
+
+
+    private void safeSleep(long sleep) {
+        try {
+            Thread.sleep(sleep);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+
+
+
+    /**
+     * Helper runnable for test {@link 
ThroughputQueueMulitThreadTest#testBlockOnFullQueue()}
+     */
+    private class BlocksOnFullQueue implements Runnable {
+
+        private CountDownLatch full;
+        volatile private boolean complete;
+        private int queueSize;
+        private CountDownLatch finished;
+        private BlockingQueue queue;
+
+        public BlocksOnFullQueue(CountDownLatch latch, CountDownLatch 
finished, BlockingQueue queue, int queueSize) {
+            this.full = latch;
+            this.complete = false;
+            this.queueSize = queueSize;
+            this.finished = finished;
+            this.queue = queue;
+        }
+
+        @Override
+        public void run() {
+            try {
+                for (int i = 0; i < this.queueSize; ++i) {
+                    this.queue.put(i);
+                }
+                this.full.countDown();
+                this.queue.put(0);
+                this.complete = true;
+                this.finished.countDown();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        public boolean isComplete() {
+            return this.complete;
+        }
+    }
+
+
+    /**
+     * Helper runnable class for test {@link 
ThroughputQueueMulitThreadTest#testBlockOnEmptyQueue()}
+     */
+    private class BlocksOnEmptyQueue implements Runnable {
+
+        private CountDownLatch full;
+        volatile private boolean complete;
+        private int queueSize;
+        private CountDownLatch finished;
+        private BlockingQueue queue;
+
+        public BlocksOnEmptyQueue(CountDownLatch full, CountDownLatch 
finished, int queueSize, BlockingQueue queue) {
+            this.full = full;
+            this.finished = finished;
+            this.queueSize = queueSize;
+            this.queue = queue;
+            this.complete = false;
+        }
+
+
+        @Override
+        public void run() {
+            try {
+                for(int i=0; i < this.queueSize; ++i) {
+                    this.queue.take();
+                }
+                this.full.countDown();
+                this.queue.take();
+                this.complete = true;
+                this.finished.countDown();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        public boolean isComplete() {
+            return this.complete;
+        }
+    }
+
+
+    private class PutData implements Runnable {
+
+        private BlockingQueue queue;
+        private int dataCount;
+        private CountDownLatch finished;
+
+        public PutData(CountDownLatch finished, BlockingQueue queue, int 
dataCount) {
+            this.queue = queue;
+            this.dataCount = dataCount;
+            this.finished = finished;
+        }
+
+
+        @Override
+        public void run() {
+            try {
+                for(int i=0; i < this.dataCount; ++i) {
+                    this.queue.put(i);
+                }
+            } catch (InterruptedException ie) {
+                LOGGER.error("PUT DATA interupted !");
+                Thread.currentThread().interrupt();
+            }
+            this.finished.countDown();
+        }
+    }
+
+
+    private class TakeData implements Runnable {
+
+        private BlockingQueue queue;
+
+        public TakeData(BlockingQueue queue) {
+            this.queue = queue;
+        }
+
+
+        @Override
+        public void run() {
+            try {
+                while(true) {
+                    this.queue.take();
+                }
+            } catch (InterruptedException ie) {
+                LOGGER.error("PUT DATA interupted !");
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/95033095/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
new file mode 100644
index 0000000..496837a
--- /dev/null
+++ 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
@@ -0,0 +1,159 @@
+package org.apache.streams.local.queues;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import org.junit.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+
+import java.lang.management.ManagementFactory;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Single thread unit tests for {@link 
org.apache.streams.local.queues.ThroughputQueue}
+ */
+public class ThroughputQueueSingleThreadTest extends RandomizedTest {
+
+
+    /**
+     * Test that take and put queue and dequeue data as expected and all
+     * measurements form the queue are returning data.
+     * @throws Exception
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testTakeAndPut() throws Exception {
+        ThroughputQueue<Integer> queue = new ThroughputQueue<>();
+        int putCount = randomIntBetween(1, 1000);
+        for(int i=0; i < putCount; ++i) {
+            queue.put(i);
+            assertEquals(i+1, queue.size());
+            assertEquals(queue.size(), queue.getCurrentSize());
+        }
+        int takeCount = randomIntBetween(1, putCount);
+        for(int i=0; i < takeCount; ++i) {
+            Integer element = queue.take();
+            assertNotNull(element);
+            assertEquals(i, element.intValue());
+            assertEquals(putCount - (1+i), queue.size());
+            assertEquals(queue.size(), queue.getCurrentSize());
+        }
+        assertEquals(putCount-takeCount, queue.size());
+        assertEquals(queue.size(), queue.getCurrentSize());
+        assertTrue(0 < queue.getMaxWait());
+        assertTrue(0 < queue.getAvgWait());
+        assertTrue(0 < queue.getThroughput());
+        assertEquals(putCount, queue.getAdded());
+        assertEquals(takeCount, queue.getRemoved());
+    }
+
+    /**
+     * Test that max wait and avg wait return expected values
+     * @throws Exception
+     */
+    @Test
+    public void testWait() throws Exception {
+        ThroughputQueue queue = new ThroughputQueue();
+        int wait = 1000;
+
+        for(int i=0; i < 3; ++i) {
+            queue.put(1);
+            safeSleep(wait);
+            queue.take();
+            assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= 
(wait * 1.2));//can't calculate exactly, making sure its close.
+            assertTrue(queue.getAvgWait() >= wait && queue.getAvgWait() <= 
(wait * 1.2));
+        }
+        queue.put(1);
+        queue.take();
+        assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 
1.2));//can't calculate exactly, making sure its close.
+        assertTrue(queue.getAvgWait() <= 1000 );
+        assertTrue(queue.getAvgWait() >= 750);
+    }
+
+    /**
+     * Test that throughput returns expected values.
+     * @throws Exception
+     */
+    @Test
+    public void testThroughput() throws Exception {
+        ThroughputQueue queue = new ThroughputQueue();
+        int wait = 100;
+        for(int i=0; i < 10; ++i) {
+            queue.put(1);
+            safeSleep(wait);
+            queue.take();
+        }
+        double throughput = queue.getThroughput();
+        assertTrue(throughput <= 10 ); //can't calculate exactly, making sure 
its close.
+        assertTrue(throughput >= 9.5);
+
+        queue = new ThroughputQueue();
+        wait = 1000;
+        for(int i=0; i < 10; ++i) {
+            queue.put(1);
+        }
+        for(int i=0; i < 10; ++i) {
+            queue.take();
+        }
+        safeSleep(wait);
+        throughput = queue.getThroughput();
+        assertTrue(throughput <= 10 ); //can't calculate exactly, making sure 
its close.
+        assertTrue(throughput >= 9.5);
+    }
+
+
+    /**
+     * Test that the mbean registers
+     */
+    @Test
+    public void testMBeanRegistration() {
+        try {
+            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            Integer beanCount = mbs.getMBeanCount();
+            String id = "testQueue";
+            ThroughputQueue queue = new ThroughputQueue(id);
+            assertEquals("Expected bean to be registered", new 
Integer(beanCount+1), mbs.getMBeanCount());
+            ObjectInstance mBean = mbs.getObjectInstance(new 
ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id)));
+            assertNotNull(mBean);
+        } catch (Exception e) {
+            fail("Failed to register MXBean : "+e.getMessage());
+        }
+    }
+
+    /**
+     * Test that mulitple mbeans of the same type with a different name can be 
registered
+     */
+    @Test
+    public void testMultipleMBeanRegistrations() {
+        try {
+            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            Integer beanCount = mbs.getMBeanCount();
+            String id = "testQueue";
+            int numReg = randomIntBetween(2, 100);
+            for(int i=0; i < numReg; ++i) {
+                ThroughputQueue queue = new ThroughputQueue(id+i);
+                assertEquals("Expected bean to be registered", new 
Integer(beanCount + (i+1)), mbs.getMBeanCount());
+                ObjectInstance mBean = mbs.getObjectInstance(new 
ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id+i)));
+                assertNotNull(mBean);
+            }
+        } catch (Exception e) {
+            fail("Failed to register MXBean : "+e.getMessage());
+        }
+    }
+
+
+    private void safeSleep(long sleep) {
+        try {
+            Thread.sleep(sleep);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+
+
+
+}

Reply via email to