Added processing time
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6916a1b7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6916a1b7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6916a1b7 Branch: refs/heads/master Commit: 6916a1b78318f7dec9d73121e25e1f6b545c9a78 Parents: c030d43 Author: Ryan Ebanks <[email protected]> Authored: Fri Oct 17 17:53:34 2014 -0500 Committer: Ryan Ebanks <[email protected]> Committed: Fri Oct 17 17:53:34 2014 -0500 ---------------------------------------------------------------------- .../local/counters/StreamsTaskCounter.java | 53 ++++++++++++++++++++ .../counters/StreamsTaskCounterMXBean.java | 10 ++++ 2 files changed, 63 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6916a1b7/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java index ffd9f25..e864219 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java @@ -17,6 +17,7 @@ */ package org.apache.streams.local.counters; +import net.jcip.annotations.GuardedBy; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,11 +38,16 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{ private AtomicLong emitted; private AtomicLong received; private AtomicLong errors; + private AtomicLong totalTime; + @GuardedBy("this") + private volatile long maxTime; public StreamsTaskCounter(String id) { this.emitted = new AtomicLong(0); this.received = new AtomicLong(0); this.errors = new AtomicLong(0); + this.totalTime = new AtomicLong(0); + this.maxTime = -1; try { ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id)); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); @@ -52,30 +58,64 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{ } } + /** + * Increment emitted count + */ public void incrementEmittedCount() { this.incrementEmittedCount(1); } + /** + * Increment emitted count + * @param delta + */ public void incrementEmittedCount(long delta) { this.emitted.addAndGet(delta); } + /** + * Increment error count + */ public void incrementErrorCount() { this.incrementErrorCount(1); } + /** + * Increment error count + * @param delta + */ public void incrementErrorCount(long delta) { this.errors.addAndGet(delta); } + /** + * Increment received count + */ public void incrementReceivedCount() { this.incrementReceivedCount(1); } + /** + * Increment received count + * @param delta + */ public void incrementReceivedCount(long delta) { this.received.addAndGet(delta); } + /** + * Add the time it takes to process a single datum in milliseconds + * @param processTime + */ + public void addTime(long processTime) { + synchronized (this) { + if(processTime > this.maxTime) { + this.maxTime = processTime; + } + } + this.totalTime.addAndGet(processTime); + } + @Override public double getErrorRate() { if(this.received.get() == 0) { @@ -98,4 +138,17 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{ public long getNumUnhandledErrors() { return this.errors.get(); } + + @Override + public double getAvgTime() { + if(this.received.get() == 0) { + return 0.0; + } + return this.totalTime.get() / (double) this.received.get(); + } + + @Override + public long getMaxTime() { + return this.maxTime; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6916a1b7/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java index 634857d..8ac2e33 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java @@ -48,6 +48,16 @@ public interface StreamsTaskCounterMXBean { */ public long getNumUnhandledErrors(); + /** + * Returns the average time in milliseconds it takes the task to readCurrent, process, or write to return. + * @return + */ + public double getAvgTime(); + /** + * Returns the max time in milliseconds it takes the task to readCurrent, process, or write to return. + * @return + */ + public long getMaxTime(); }
