STREAMS-190 | Simplified locks to AtomicLongs and added more implementations and tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/126a34f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/126a34f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/126a34f3 Branch: refs/heads/master Commit: 126a34f355f9c39774fdd2f36570f1b258d829c2 Parents: e455160 Author: Ryan Ebanks <[email protected]> Authored: Fri Oct 10 11:10:48 2014 -0500 Committer: Ryan Ebanks <[email protected]> Committed: Fri Oct 10 11:10:48 2014 -0500 ---------------------------------------------------------------------- .../streams/local/queues/ThroughputQueue.java | 271 ++++++++++++++----- .../queues/ThroughputQueueSingleThreadTest.java | 66 +++++ 2 files changed, 262 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/126a34f3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java index 99c8cbf..aacecc8 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java @@ -17,7 +17,6 @@ */ package org.apache.streams.local.queues; -import net.jcip.annotations.GuardedBy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import sun.reflect.generics.reflectiveObjects.NotImplementedException; @@ -37,7 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * A {@link java.util.concurrent.BlockingQueue} implementation that allows the measure measurement of how * data flows through the queue. Is also a {@code MBean} so the flow statistics can be viewed through * JMX. Registration of the bean happens whenever a constructor receives a non-null id. - * + * <p/> * !!! Warning !!! * Only the necessary methods for the local streams runtime are implemented. All other methods throw a * {@link sun.reflect.generics.reflectiveObjects.NotImplementedException}. @@ -49,17 +48,13 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueue.class); private BlockingQueue<ThroughputElement<E>> underlyingQueue; - private ReadWriteLock takeCountsLock; private AtomicLong elementsAdded; - @GuardedBy("takeCountsLock") - private long elementsRemoved; - @GuardedBy("this") - private long startTime; - @GuardedBy("takeCountsLock") - private long totalQueueTime; - @GuardedBy("takeCountsLock") + private AtomicLong elementsRemoved; + private AtomicLong startTime; + private AtomicLong totalQueueTime; private long maxQueuedTime; private volatile boolean active; + private ReadWriteLock maxQueueTimeLock; /** * Creates an unbounded, unregistered {@code ThroughputQueue} @@ -70,6 +65,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe /** * Creates a bounded, unregistered {@code ThroughputQueue} + * * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded */ public ThroughputQueue(int maxSize) { @@ -78,6 +74,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe /** * Creates an unbounded, registered {@code ThroughputQueue} + * * @param id unique id for this queue to be registered with. if id == NULL then not registered */ public ThroughputQueue(String id) { @@ -86,27 +83,29 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe /** * Creates a bounded, registered {@code ThroughputQueue} + * * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded - * @param id unique id for this queue to be registered with. if id == NULL then not registered + * @param id unique id for this queue to be registered with. if id == NULL then not registered */ public ThroughputQueue(int maxSize, String id) { - if(maxSize < 1) { + if (maxSize < 1) { this.underlyingQueue = new LinkedBlockingQueue<>(); } else { this.underlyingQueue = new LinkedBlockingQueue<>(maxSize); } this.elementsAdded = new AtomicLong(0); - this.elementsRemoved = 0; - this.startTime = -1; - this.takeCountsLock = new ReentrantReadWriteLock(); + this.elementsRemoved = new AtomicLong(0); + this.startTime = new AtomicLong(-1); this.active = false; - this.maxQueuedTime = -1; - if(id != null) { + this.maxQueuedTime = 0; + this.maxQueueTimeLock = new ReentrantReadWriteLock(); + this.totalQueueTime = new AtomicLong(0); + if (id != null) { try { ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id)); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); mbs.registerMBean(this, name); - } catch (MalformedObjectNameException|InstanceAlreadyExistsException|MBeanRegistrationException|NotCompliantMBeanException e) { + } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) { LOGGER.error("Failed to register MXBean : {}", e); throw new RuntimeException(e); } @@ -115,12 +114,32 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe @Override public boolean add(E e) { - throw new NotImplementedException(); + if (this.underlyingQueue.add(new ThroughputElement<E>(e))) { + this.elementsAdded.incrementAndGet(); + synchronized (this) { + if (!this.active) { + this.startTime.set(System.currentTimeMillis()); + this.active = true; + } + } + return true; + } + return false; } @Override public boolean offer(E e) { - throw new NotImplementedException(); + if (this.underlyingQueue.offer(new ThroughputElement<E>(e))) { + this.elementsAdded.incrementAndGet(); + synchronized (this) { + if (!this.active) { + this.startTime.set(System.currentTimeMillis()); + this.active = true; + } + } + return true; + } + return false; } @Override @@ -129,20 +148,19 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe this.elementsAdded.incrementAndGet(); synchronized (this) { if (!this.active) { - this.startTime = System.currentTimeMillis(); + this.startTime.set(System.currentTimeMillis()); this.active = true; } } - } @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { - if(this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) { + if (this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) { this.elementsAdded.incrementAndGet(); synchronized (this) { if (!this.active) { - this.startTime = System.currentTimeMillis(); + this.startTime.set(System.currentTimeMillis()); this.active = true; } } @@ -154,38 +172,79 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe @Override public E take() throws InterruptedException { ThroughputElement<E> e = this.underlyingQueue.take(); + this.elementsRemoved.incrementAndGet(); + Long queueTime = e.getWaited(); + this.totalQueueTime.addAndGet(queueTime); + boolean unlocked = false; try { - this.takeCountsLock.writeLock().lockInterruptibly(); - ++this.elementsRemoved; - Long queueTime = e.getWaited(); - this.totalQueueTime += queueTime; - if(this.maxQueuedTime < queueTime) { - this.maxQueuedTime = queueTime; + this.maxQueueTimeLock.readLock().lock(); + if (this.maxQueuedTime < queueTime) { + this.maxQueueTimeLock.readLock().unlock(); + unlocked = true; + try { + this.maxQueueTimeLock.writeLock().lock(); + this.maxQueuedTime = queueTime; + } finally { + this.maxQueueTimeLock.writeLock().unlock(); + } } } finally { - this.takeCountsLock.writeLock().unlock(); + if(!unlocked) + this.maxQueueTimeLock.readLock().unlock(); } return e.getElement(); } @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { - throw new NotImplementedException(); + ThroughputElement<E> e = this.underlyingQueue.poll(timeout, unit); + if(e != null) { + this.elementsRemoved.incrementAndGet(); + Long queueTime = e.getWaited(); + this.totalQueueTime.addAndGet(queueTime); + boolean unlocked = false; + try { + this.maxQueueTimeLock.readLock().lock(); + if (this.maxQueuedTime < queueTime) { + this.maxQueueTimeLock.readLock().unlock(); + unlocked = true; + try { + this.maxQueueTimeLock.writeLock().lock(); + this.maxQueuedTime = queueTime; + } finally { + this.maxQueueTimeLock.writeLock().unlock(); + } + } + } finally { + if(!unlocked) + this.maxQueueTimeLock.readLock().unlock(); + } + return e.getElement(); + } + return null; } @Override public int remainingCapacity() { - throw new NotImplementedException(); + return this.underlyingQueue.remainingCapacity(); } @Override public boolean remove(Object o) { - throw new NotImplementedException(); + try { + return this.underlyingQueue.remove(new ThroughputElement<E>((E) o)); + } catch (ClassCastException cce) { + return false; + } } @Override public boolean contains(Object o) { - throw new NotImplementedException(); + try { + return this.underlyingQueue.contains(new ThroughputElement<E>((E) o)); + } catch (ClassCastException cce) { + return false; + } } @Override @@ -200,12 +259,60 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe @Override public E remove() { - throw new NotImplementedException(); + ThroughputElement<E> e = this.underlyingQueue.remove(); + if(e != null) { + this.elementsRemoved.incrementAndGet(); + Long queueTime = e.getWaited(); + this.totalQueueTime.addAndGet(queueTime); + boolean unlocked = false; + try { + this.maxQueueTimeLock.readLock().lock(); + if (this.maxQueuedTime < queueTime) { + this.maxQueueTimeLock.readLock().unlock(); + unlocked = true; + try { + this.maxQueueTimeLock.writeLock().lock(); + this.maxQueuedTime = queueTime; + } finally { + this.maxQueueTimeLock.writeLock().unlock(); + } + } + } finally { + if(!unlocked) + this.maxQueueTimeLock.readLock().unlock(); + } + return e.getElement(); + } + return null; } @Override public E poll() { - throw new NotImplementedException(); + ThroughputElement<E> e = this.underlyingQueue.poll(); + if(e != null) { + this.elementsRemoved.incrementAndGet(); + Long queueTime = e.getWaited(); + this.totalQueueTime.addAndGet(queueTime); + boolean unlocked = false; + try { + this.maxQueueTimeLock.readLock().lock(); + if (this.maxQueuedTime < queueTime) { + this.maxQueueTimeLock.readLock().unlock(); + unlocked = true; + try { + this.maxQueueTimeLock.writeLock().lock(); + this.maxQueuedTime = queueTime; + } finally { + this.maxQueueTimeLock.writeLock().unlock(); + } + } + } finally { + if(!unlocked) + this.maxQueueTimeLock.readLock().unlock(); + } + return e.getElement(); + } + return null; } @Override @@ -215,7 +322,11 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe @Override public E peek() { - throw new NotImplementedException(); + ThroughputElement<E> e = this.underlyingQueue.peek(); + if( e != null) { + return e.getElement(); + } + return null; } @Override @@ -270,26 +381,27 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe @Override public long getCurrentSize() { - long size = -1; - try { - this.takeCountsLock.readLock().lock(); - size = this.elementsAdded.get() - this.elementsRemoved; - } finally { - this.takeCountsLock.readLock().unlock(); - } - return size; + return this.elementsAdded.get() - this.elementsRemoved.get(); } + /** + * If elements have been removed from the queue or no elements have been added, it returns the average wait time + * in milliseconds. If elements have been added, but none have been removed, it returns the time waited by the first + * element in the queue. + * + * @return the average wait time in milliseconds + */ @Override public double getAvgWait() { - double avg = -1.0; - try { - this.takeCountsLock.readLock().lock(); - avg = (double) this.totalQueueTime / (double) this.elementsRemoved; - } finally { - this.takeCountsLock.readLock().unlock(); + if (this.elementsRemoved.get() == 0) { + if (this.getCurrentSize() > 0) { + return this.underlyingQueue.peek().getWaited(); + } else { + return 0.0; + } + } else { + return (double) this.totalQueueTime.get() / (double) this.elementsRemoved.get(); } - return avg; } @Override @@ -297,28 +409,21 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe ThroughputElement<E> e = this.underlyingQueue.peek(); long max = -1; try { - this.takeCountsLock.readLock().lock(); + this.maxQueueTimeLock.readLock().lock(); if (e != null && e.getWaited() > this.maxQueuedTime) { max = e.getWaited(); } else { max = this.maxQueuedTime; } } finally { - this.takeCountsLock.readLock().unlock(); + this.maxQueueTimeLock.readLock().unlock(); } return max; } @Override public long getRemoved() { - long num = -1; - try { - this.takeCountsLock.readLock().lock(); - num = this.elementsRemoved; - } finally { - this.takeCountsLock.readLock().unlock(); - } - return num; + return this.elementsRemoved.get(); } @Override @@ -328,27 +433,20 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe @Override public double getThroughput() { - double tp = -1.0; - synchronized (this) { - try { - if(active) { - this.takeCountsLock.readLock().lock(); - tp = this.elementsRemoved / ((System.currentTimeMillis() - this.startTime) / 1000.0); - } - } finally { - this.takeCountsLock.readLock().unlock(); - } + if (active) { + return this.elementsRemoved.get() / ((System.currentTimeMillis() - this.startTime.get()) / 1000.0); } - return tp; + return 0.0; } /** * Element wrapper to measure time waiting on the queue + * * @param <E> */ private class ThroughputElement<E> { - + private long queuedTime; private E element; @@ -360,6 +458,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe /** * Get the time this element has been waiting on the queue. * current time - time element was queued + * * @return time this element has been waiting on the queue in milliseconds */ public long getWaited() { @@ -368,10 +467,32 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe /** * Get the queued element + * * @return the element */ public E getElement() { return this.element; } + + + /** + * Measures equality by the element and ignores the queued time + * @param obj + * @return + */ + @Override + public boolean equals(Object obj) { + if(obj instanceof ThroughputElement && obj != null) { + ThroughputElement that = (ThroughputElement) obj; + if(that.getElement() == null && this.getElement() == null) { + return true; + } else if(that.getElement() != null) { + return that.getElement().equals(this.getElement()); + } else { + return false; + } + } + return false; + } } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/126a34f3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java index 2be1aed..569ba5c 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java @@ -69,6 +69,72 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest { } /** + * Test that add and remove queue and dequeue data as expected + * and all measurements from the queue are returning data + */ + @Test + @Repeat(iterations = 3) + public void testAddAndRemove() { + ThroughputQueue<Integer> queue = new ThroughputQueue<>(); + int putCount = randomIntBetween(1, 1000); + for(int i=0; i < putCount; ++i) { + queue.add(i); + assertEquals(i+1, queue.size()); + assertEquals(queue.size(), queue.getCurrentSize()); + } + safeSleep(100); //ensure measurable wait time + int takeCount = randomIntBetween(1, putCount); + for(int i=0; i < takeCount; ++i) { + Integer element = queue.remove(); + assertNotNull(element); + assertEquals(i, element.intValue()); + assertEquals(putCount - (1+i), queue.size()); + assertEquals(queue.size(), queue.getCurrentSize()); + } + assertEquals(putCount-takeCount, queue.size()); + assertEquals(queue.size(), queue.getCurrentSize()); + assertTrue(0.0 < queue.getMaxWait()); + assertTrue(0.0 < queue.getAvgWait()); + assertTrue(0.0 < queue.getThroughput()); + assertEquals(putCount, queue.getAdded()); + assertEquals(takeCount, queue.getRemoved()); + } + + /** + * Test that offer and poll queue and dequeue data as expected + * and all measurements from the queue are returning data + */ + @Test + @Repeat(iterations = 3) + public void testOfferAndPoll() { + ThroughputQueue<Integer> queue = new ThroughputQueue<>(); + int putCount = randomIntBetween(1, 1000); + for(int i=0; i < putCount; ++i) { + queue.offer(i); + assertEquals(i+1, queue.size()); + assertEquals(queue.size(), queue.getCurrentSize()); + } + safeSleep(100); //ensure measurable wait time + int takeCount = randomIntBetween(1, putCount); + for(int i=0; i < takeCount; ++i) { + Integer element = queue.poll(); + assertNotNull(element); + assertEquals(i, element.intValue()); + assertEquals(putCount - (1+i), queue.size()); + assertEquals(queue.size(), queue.getCurrentSize()); + } + assertEquals(putCount-takeCount, queue.size()); + assertEquals(queue.size(), queue.getCurrentSize()); + assertTrue(0.0 < queue.getMaxWait()); + assertTrue(0.0 < queue.getAvgWait()); + assertTrue(0.0 < queue.getThroughput()); + assertEquals(putCount, queue.getAdded()); + assertEquals(takeCount, queue.getRemoved()); + } + + + + /** * Test that max wait and avg wait return expected values * @throws Exception */
