Repository: storm Updated Branches: refs/heads/master 3512ae97f -> 2aba1c42a
STORM-2621: add tuple_population metric Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4528a618 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4528a618 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4528a618 Branch: refs/heads/master Commit: 4528a61872d3a9709d1d9f6c66a9f15fc0810f2e Parents: ce19f81 Author: Robert (Bobby) Evans <[email protected]> Authored: Mon Jul 10 16:29:56 2017 -0500 Committer: Robert (Bobby) Evans <[email protected]> Committed: Thu Jul 20 13:33:18 2017 -0500 ---------------------------------------------------------------------- .../jvm/org/apache/storm/utils/DisruptorQueue.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/4528a618/storm-client/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-client/src/jvm/org/apache/storm/utils/DisruptorQueue.java index 1d45087..3779505 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-client/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -352,15 +352,18 @@ public class DisruptorQueue implements IStatefulObject { long rp = readPos(); long wp = writePos(); + final long tuplePop = tuplePopulation.get(); + final double arrivalRateInSecs = _rateTracker.reportRate(); //Assume the queue is stable, in which the arrival rate is equal to the consumption rate. // If this assumption does not hold, the calculation of sojourn time should also consider // departure rate according to Queuing Theory. - final double sojournTime = (wp - rp) / Math.max(arrivalRateInSecs, 0.00001) * 1000.0; + final double sojournTime = tuplePop / Math.max(arrivalRateInSecs, 0.00001) * 1000.0; state.put("capacity", capacity()); state.put("population", wp - rp); + state.put("tuple_population", tuplePop); state.put("write_pos", wp); state.put("read_pos", rp); state.put("arrival_rate_secs", arrivalRateInSecs); @@ -372,6 +375,11 @@ public class DisruptorQueue implements IStatefulObject { public void notifyArrivals(long counts) { _rateTracker.notify(counts); + tuplePopulation.getAndAdd(counts); + } + + public void notifyDepartures(long counts) { + tuplePopulation.getAndAdd(-counts); } public void close() { @@ -393,6 +401,7 @@ public class DisruptorQueue implements IStatefulObject { private int _lowWaterMark = 0; private boolean _enableBackpressure = false; private final AtomicLong _overflowCount = new AtomicLong(0); + private final AtomicLong tuplePopulation = new AtomicLong(0); private volatile boolean _throttleOn = false; public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval) { @@ -473,6 +482,7 @@ public class DisruptorQueue implements IStatefulObject { } else if (o == null) { LOG.error("NULL found in {}:{}", this.getName(), cursor); } else { + _metrics.notifyDepartures(getTupleCount(o)); handler.onEvent(o, curr, curr == cursor); if (_enableBackpressure && _cb != null && (_metrics.writePos() - curr + _overflowCount.get()) <= _lowWaterMark) { try { @@ -549,8 +559,8 @@ public class DisruptorQueue implements IStatefulObject { at++; numberOfTuples += getTupleCount(obj); } - _buffer.publish(begin, end); _metrics.notifyArrivals(numberOfTuples); + _buffer.publish(begin, end); } }
