Repository: incubator-streams Updated Branches: refs/heads/master a973ba217 -> 6dd1ea51c
STREAMS-190 | Refactored to remove repeated code Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/90b0bfcb Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/90b0bfcb Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/90b0bfcb Branch: refs/heads/master Commit: 90b0bfcb0b9de107412220d65e438edca26612e7 Parents: 126a34f Author: Ryan Ebanks <[email protected]> Authored: Fri Oct 10 17:01:23 2014 -0500 Committer: Ryan Ebanks <[email protected]> Committed: Fri Oct 10 17:01:23 2014 -0500 ---------------------------------------------------------------------- .../streams/local/queues/ThroughputQueue.java | 158 ++++++------------- 1 file changed, 50 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/90b0bfcb/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java index aacecc8..de1add3 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java @@ -115,13 +115,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe @Override public boolean add(E e) { if (this.underlyingQueue.add(new ThroughputElement<E>(e))) { - this.elementsAdded.incrementAndGet(); - synchronized (this) { - if (!this.active) { - this.startTime.set(System.currentTimeMillis()); - this.active = true; - } - } + internalAddElement(); return true; } return false; @@ -130,13 +124,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe @Override public boolean offer(E e) { if (this.underlyingQueue.offer(new ThroughputElement<E>(e))) { - this.elementsAdded.incrementAndGet(); - synchronized (this) { - if (!this.active) { - this.startTime.set(System.currentTimeMillis()); - this.active = true; - } - } + internalAddElement(); return true; } return false; @@ -145,25 +133,13 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe @Override public void put(E e) throws InterruptedException { this.underlyingQueue.put(new ThroughputElement<E>(e)); - this.elementsAdded.incrementAndGet(); - synchronized (this) { - if (!this.active) { - this.startTime.set(System.currentTimeMillis()); - this.active = true; - } - } + internalAddElement(); } @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) { - this.elementsAdded.incrementAndGet(); - synchronized (this) { - if (!this.active) { - this.startTime.set(System.currentTimeMillis()); - this.active = true; - } - } + internalAddElement(); return true; } return false; @@ -172,26 +148,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe @Override public E take() throws InterruptedException { ThroughputElement<E> e = this.underlyingQueue.take(); - this.elementsRemoved.incrementAndGet(); - Long queueTime = e.getWaited(); - this.totalQueueTime.addAndGet(queueTime); - boolean unlocked = false; - try { - this.maxQueueTimeLock.readLock().lock(); - if (this.maxQueuedTime < queueTime) { - this.maxQueueTimeLock.readLock().unlock(); - unlocked = true; - try { - this.maxQueueTimeLock.writeLock().lock(); - this.maxQueuedTime = queueTime; - } finally { - this.maxQueueTimeLock.writeLock().unlock(); - } - } - } finally { - if(!unlocked) - this.maxQueueTimeLock.readLock().unlock(); - } + internalRemoveElement(e); return e.getElement(); } @@ -199,26 +156,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe public E poll(long timeout, TimeUnit unit) throws InterruptedException { ThroughputElement<E> e = this.underlyingQueue.poll(timeout, unit); if(e != null) { - this.elementsRemoved.incrementAndGet(); - Long queueTime = e.getWaited(); - this.totalQueueTime.addAndGet(queueTime); - boolean unlocked = false; - try { - this.maxQueueTimeLock.readLock().lock(); - if (this.maxQueuedTime < queueTime) { - this.maxQueueTimeLock.readLock().unlock(); - unlocked = true; - try { - this.maxQueueTimeLock.writeLock().lock(); - this.maxQueuedTime = queueTime; - } finally { - this.maxQueueTimeLock.writeLock().unlock(); - } - } - } finally { - if(!unlocked) - this.maxQueueTimeLock.readLock().unlock(); - } + internalRemoveElement(e); return e.getElement(); } return null; @@ -261,26 +199,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe public E remove() { ThroughputElement<E> e = this.underlyingQueue.remove(); if(e != null) { - this.elementsRemoved.incrementAndGet(); - Long queueTime = e.getWaited(); - this.totalQueueTime.addAndGet(queueTime); - boolean unlocked = false; - try { - this.maxQueueTimeLock.readLock().lock(); - if (this.maxQueuedTime < queueTime) { - this.maxQueueTimeLock.readLock().unlock(); - unlocked = true; - try { - this.maxQueueTimeLock.writeLock().lock(); - this.maxQueuedTime = queueTime; - } finally { - this.maxQueueTimeLock.writeLock().unlock(); - } - } - } finally { - if(!unlocked) - this.maxQueueTimeLock.readLock().unlock(); - } + internalRemoveElement(e); return e.getElement(); } return null; @@ -290,26 +209,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe public E poll() { ThroughputElement<E> e = this.underlyingQueue.poll(); if(e != null) { - this.elementsRemoved.incrementAndGet(); - Long queueTime = e.getWaited(); - this.totalQueueTime.addAndGet(queueTime); - boolean unlocked = false; - try { - this.maxQueueTimeLock.readLock().lock(); - if (this.maxQueuedTime < queueTime) { - this.maxQueueTimeLock.readLock().unlock(); - unlocked = true; - try { - this.maxQueueTimeLock.writeLock().lock(); - this.maxQueuedTime = queueTime; - } finally { - this.maxQueueTimeLock.writeLock().unlock(); - } - } - } finally { - if(!unlocked) - this.maxQueueTimeLock.readLock().unlock(); - } + internalRemoveElement(e); return e.getElement(); } return null; @@ -439,6 +339,48 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe return 0.0; } + /** + * Handles updating the stats whenever elements are added to the queue + */ + private void internalAddElement() { + this.elementsAdded.incrementAndGet(); + synchronized (this) { + if (!this.active) { + this.startTime.set(System.currentTimeMillis()); + this.active = true; + } + } + } + + /** + * Handle updating the stats whenever elements are removed from the queue + * @param e Element removed + */ + private void internalRemoveElement(ThroughputElement<E> e) { + if(e != null) { + this.elementsRemoved.incrementAndGet(); + Long queueTime = e.getWaited(); + this.totalQueueTime.addAndGet(queueTime); + boolean unlocked = false; + try { + this.maxQueueTimeLock.readLock().lock(); + if (this.maxQueuedTime < queueTime) { + this.maxQueueTimeLock.readLock().unlock(); + unlocked = true; + try { + this.maxQueueTimeLock.writeLock().lock(); + this.maxQueuedTime = queueTime; + } finally { + this.maxQueueTimeLock.writeLock().unlock(); + } + } + } finally { + if (!unlocked) + this.maxQueueTimeLock.readLock().unlock(); + } + } + } + /** * Element wrapper to measure time waiting on the queue
