Repository: storm Updated Branches: refs/heads/1.x-branch 0f31560ac -> 0efb94ce0
Merge branch 'STORM-2557' of https://github.com/revans2/incubator-storm into STORM-2621 STORM-2621: add tuple_population metric Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fb2b9e5d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fb2b9e5d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fb2b9e5d Branch: refs/heads/1.x-branch Commit: fb2b9e5d680c1ee4bc822787b4a4d2f0b06d4983 Parents: 0f31560 Author: Robert Evans <[email protected]> Authored: Fri Jul 21 10:40:39 2017 -0500 Committer: Robert Evans <[email protected]> Committed: Fri Jul 21 10:55:12 2017 -0500 ---------------------------------------------------------------------- .../jvm/org/apache/storm/utils/DisruptorQueue.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/fb2b9e5d/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index fe90240..5fd4b84 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -352,15 +352,18 @@ public class DisruptorQueue implements IStatefulObject { long rp = readPos(); long wp = writePos(); + final long tuplePop = tuplePopulation.get(); + final double arrivalRateInSecs = _rateTracker.reportRate(); //Assume the queue is stable, in which the arrival rate is equal to the consumption rate. // If this assumption does not hold, the calculation of sojourn time should also consider // departure rate according to Queuing Theory. - final double sojournTime = (wp - rp) / Math.max(arrivalRateInSecs, 0.00001) * 1000.0; + final double sojournTime = tuplePop / Math.max(arrivalRateInSecs, 0.00001) * 1000.0; state.put("capacity", capacity()); state.put("population", wp - rp); + state.put("tuple_population", tuplePop); state.put("write_pos", wp); state.put("read_pos", rp); state.put("arrival_rate_secs", arrivalRateInSecs); @@ -372,6 +375,11 @@ public class DisruptorQueue implements IStatefulObject { public void notifyArrivals(long counts) { _rateTracker.notify(counts); + tuplePopulation.getAndAdd(counts); + } + + public void notifyDepartures(long counts) { + tuplePopulation.getAndAdd(-counts); } public void close() { @@ -393,6 +401,7 @@ public class DisruptorQueue implements IStatefulObject { private int _lowWaterMark = 0; private boolean _enableBackpressure = false; private final AtomicLong _overflowCount = new AtomicLong(0); + private final AtomicLong tuplePopulation = new AtomicLong(0); private volatile boolean _throttleOn = false; public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval) { @@ -469,6 +478,7 @@ public class DisruptorQueue implements IStatefulObject { } else if (o == null) { LOG.error("NULL found in {}:{}", this.getName(), cursor); } else { + _metrics.notifyDepartures(getTupleCount(o)); handler.onEvent(o, curr, curr == cursor); if (_enableBackpressure && _cb != null && (_metrics.writePos() - curr + _overflowCount.get()) <= _lowWaterMark) { try { @@ -545,8 +555,8 @@ public class DisruptorQueue implements IStatefulObject { at++; numberOfTuples += getTupleCount(obj); } - _buffer.publish(begin, end); _metrics.notifyArrivals(numberOfTuples); + _buffer.publish(begin, end); } }
