Repository: storm
Updated Branches:
  refs/heads/1.1.x-branch ebbc4d778 -> d83b0b971


Merge branch 'STORM-2557' of https://github.com/revans2/incubator-storm into 
STORM-2621

STORM-2621: add tuple_population metric


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7b72b68a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7b72b68a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7b72b68a

Branch: refs/heads/1.1.x-branch
Commit: 7b72b68a6466ebe3dc7c17ab13f9f53621a0fde4
Parents: ebbc4d7
Author: Robert Evans <[email protected]>
Authored: Fri Jul 21 10:40:39 2017 -0500
Committer: Robert Evans <[email protected]>
Committed: Fri Jul 21 11:12:56 2017 -0500

----------------------------------------------------------------------
 .../jvm/org/apache/storm/utils/DisruptorQueue.java    | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7b72b68a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java 
b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index fe90240..5fd4b84 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -352,15 +352,18 @@ public class DisruptorQueue implements IStatefulObject {
             long rp = readPos();
             long wp = writePos();
 
+            final long tuplePop = tuplePopulation.get();
+
             final double arrivalRateInSecs = _rateTracker.reportRate();
 
             //Assume the queue is stable, in which the arrival rate is equal 
to the consumption rate.
             // If this assumption does not hold, the calculation of sojourn 
time should also consider
             // departure rate according to Queuing Theory.
-            final double sojournTime = (wp - rp) / Math.max(arrivalRateInSecs, 
0.00001) * 1000.0;
+            final double sojournTime = tuplePop / Math.max(arrivalRateInSecs, 
0.00001) * 1000.0;
 
             state.put("capacity", capacity());
             state.put("population", wp - rp);
+            state.put("tuple_population", tuplePop);
             state.put("write_pos", wp);
             state.put("read_pos", rp);
             state.put("arrival_rate_secs", arrivalRateInSecs);
@@ -372,6 +375,11 @@ public class DisruptorQueue implements IStatefulObject {
 
         public void notifyArrivals(long counts) {
             _rateTracker.notify(counts);
+            tuplePopulation.getAndAdd(counts);
+        }
+
+        public void notifyDepartures(long counts) {
+            tuplePopulation.getAndAdd(-counts);
         }
 
         public void close() {
@@ -393,6 +401,7 @@ public class DisruptorQueue implements IStatefulObject {
     private int _lowWaterMark = 0;
     private boolean _enableBackpressure = false;
     private final AtomicLong _overflowCount = new AtomicLong(0);
+    private final AtomicLong tuplePopulation = new AtomicLong(0);
     private volatile boolean _throttleOn = false;
 
     public DisruptorQueue(String queueName, ProducerType type, int size, long 
readTimeout, int inputBatchSize, long flushInterval) {
@@ -469,6 +478,7 @@ public class DisruptorQueue implements IStatefulObject {
                 } else if (o == null) {
                     LOG.error("NULL found in {}:{}", this.getName(), cursor);
                 } else {
+                    _metrics.notifyDepartures(getTupleCount(o));
                     handler.onEvent(o, curr, curr == cursor);
                     if (_enableBackpressure && _cb != null && 
(_metrics.writePos() - curr + _overflowCount.get()) <= _lowWaterMark) {
                         try {
@@ -545,8 +555,8 @@ public class DisruptorQueue implements IStatefulObject {
                 at++;
                 numberOfTuples += getTupleCount(obj);
             }
-            _buffer.publish(begin, end);
             _metrics.notifyArrivals(numberOfTuples);
+            _buffer.publish(begin, end);
         }
     }
 

Reply via email to