STREAMS-190 | Changed private variable to AtomicLong since it is only accessed through atomic operations
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3f65e5c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3f65e5c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3f65e5c3 Branch: refs/heads/master Commit: 3f65e5c35fb978ba5ae393ef737e479be59960bb Parents: e82c47c Author: Ryan Ebanks <[email protected]> Authored: Wed Oct 8 16:06:32 2014 -0500 Committer: Ryan Ebanks <[email protected]> Committed: Wed Oct 8 16:06:32 2014 -0500 ---------------------------------------------------------------------- .../streams/local/queues/ThroughputQueue.java | 38 ++++---------------- .../queues/ThroughputQueueSingleThreadTest.java | 7 ++-- 2 files changed, 11 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3f65e5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java index b280dbf..99c8cbf 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -48,10 +49,8 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueue.class); private BlockingQueue<ThroughputElement<E>> underlyingQueue; - private ReadWriteLock putCountsLock; private ReadWriteLock takeCountsLock; - @GuardedBy("putCountsLock") - private long elementsAdded; + private AtomicLong elementsAdded; @GuardedBy("takeCountsLock") private long elementsRemoved; @GuardedBy("this") @@ -96,10 +95,9 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe } else { this.underlyingQueue = new LinkedBlockingQueue<>(maxSize); } - this.elementsAdded = 0; + this.elementsAdded = new AtomicLong(0); this.elementsRemoved = 0; this.startTime = -1; - this.putCountsLock = new ReentrantReadWriteLock(); this.takeCountsLock = new ReentrantReadWriteLock(); this.active = false; this.maxQueuedTime = -1; @@ -128,12 +126,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe @Override public void put(E e) throws InterruptedException { this.underlyingQueue.put(new ThroughputElement<E>(e)); - try { - this.putCountsLock.writeLock().lockInterruptibly(); - ++this.elementsAdded; - } finally { - this.putCountsLock.writeLock().unlock(); - } + this.elementsAdded.incrementAndGet(); synchronized (this) { if (!this.active) { this.startTime = System.currentTimeMillis(); @@ -146,12 +139,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if(this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) { - try { - this.putCountsLock.writeLock().lockInterruptibly(); - ++this.elementsAdded; - } finally { - this.putCountsLock.writeLock().unlock(); - } + this.elementsAdded.incrementAndGet(); synchronized (this) { if (!this.active) { this.startTime = System.currentTimeMillis(); @@ -283,17 +271,12 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe @Override public long getCurrentSize() { long size = -1; - try { - this.putCountsLock.readLock().lock(); try { this.takeCountsLock.readLock().lock(); - size = this.elementsAdded - this.elementsRemoved; + size = this.elementsAdded.get() - this.elementsRemoved; } finally { this.takeCountsLock.readLock().unlock(); } - } finally { - this.putCountsLock.readLock().unlock(); - } return size; } @@ -340,14 +323,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe @Override public long getAdded() { - long num = -1; - try { - this.putCountsLock.readLock().lock(); - num = this.elementsAdded; - } finally { - this.putCountsLock.readLock().unlock(); - } - return num; + return this.elementsAdded.get(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3f65e5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java index 2ee0008..2be1aed 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java @@ -50,6 +50,7 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest { assertEquals(i+1, queue.size()); assertEquals(queue.size(), queue.getCurrentSize()); } + safeSleep(100); //ensure measurable wait time int takeCount = randomIntBetween(1, putCount); for(int i=0; i < takeCount; ++i) { Integer element = queue.take(); @@ -60,9 +61,9 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest { } assertEquals(putCount-takeCount, queue.size()); assertEquals(queue.size(), queue.getCurrentSize()); - assertTrue(0 < queue.getMaxWait()); - assertTrue(0 < queue.getAvgWait()); - assertTrue(0 < queue.getThroughput()); + assertTrue(0.0 < queue.getMaxWait()); + assertTrue(0.0 < queue.getAvgWait()); + assertTrue(0.0 < queue.getThroughput()); assertEquals(putCount, queue.getAdded()); assertEquals(takeCount, queue.getRemoved()); }
