This is an automated email from the ASF dual-hosted git repository. karthikz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push: new d669d06 Metrics added to measure the rate at which tuples are added to the queue at spout gateway (#2981) d669d06 is described below commit d669d0615198ce36a8cdebadd184ba4c3bcc9e49 Author: Faria Kalim <faria.ka...@gmail.com> AuthorDate: Fri Aug 17 08:37:51 2018 -0700 Metrics added to measure the rate at which tuples are added to the queue at spout gateway (#2981) * added metrics to measure tuples added to outgoing queue per min * added metric to full metrics instance * added cumulative outgoing queue size * measuring number of tuples per tupleset * added comment justifying use of metrics * removed extra comment --- .../heron/common/utils/metrics/BoltMetrics.java | 10 +++++++++- .../common/utils/metrics/ComponentMetrics.java | 2 +- .../common/utils/metrics/FullBoltMetrics.java | 9 ++++++++- .../common/utils/metrics/FullSpoutMetrics.java | 22 ++++++++++++++++++++-- .../heron/common/utils/metrics/SpoutMetrics.java | 8 ++++++++ .../heron/instance/AbstractOutputCollector.java | 2 +- .../heron/instance/OutgoingTupleCollection.java | 8 +++++++- .../org/apache/heron/metrics/GatewayMetrics.java | 1 - 8 files changed, 54 insertions(+), 8 deletions(-) diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java index 13006fb..bd40180 100644 --- a/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java +++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java @@ -44,7 +44,7 @@ public class BoltMetrics implements ComponentMetrics { private final CountMetric failCount; private final CountMetric executeCount; private final ReducedMetric<MeanReducerState, Number, Double> executeLatency; - + private final CountMetric tupleAddedToQueue; // Time in nano-seconds spending in execute() at every interval private final CountMetric emitCount; @@ -53,6 +53,7 @@ public class BoltMetrics implements ComponentMetrics { private final CountMetric outQueueFullCount; + public BoltMetrics() { ackCount = new CountMetric(); processLatency = new ReducedMetric<>(new MeanReducer()); @@ -62,6 +63,7 @@ public class BoltMetrics implements ComponentMetrics { executeLatency = new ReducedMetric<>(new MeanReducer()); emitCount = new CountMetric(); outQueueFullCount = new CountMetric(); + tupleAddedToQueue = new CountMetric(); } public void registerMetrics(TopologyContextImpl topologyContext) { @@ -78,6 +80,8 @@ public class BoltMetrics implements ComponentMetrics { topologyContext.registerMetric("__execute-latency/default", executeLatency, interval); topologyContext.registerMetric("__emit-count/default", emitCount, interval); topologyContext.registerMetric("__out-queue-full-count", outQueueFullCount, interval); + topologyContext.registerMetric("__data-tuple-added-to-outgoing-queue/default", + tupleAddedToQueue, interval); } // For MultiCountMetrics, we need to set the default value for all streams. @@ -109,6 +113,10 @@ public class BoltMetrics implements ComponentMetrics { emitCount.incr(); } + public void addTupleToQueue(int size) { + tupleAddedToQueue.incr(); + } + public void updateOutQueueFullCount() { outQueueFullCount.incr(); } diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/ComponentMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/ComponentMetrics.java index 81728f6..d9e3b23 100644 --- a/heron/common/src/java/org/apache/heron/common/utils/metrics/ComponentMetrics.java +++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/ComponentMetrics.java @@ -28,7 +28,7 @@ import org.apache.heron.classification.InterfaceStability; @InterfaceAudience.Private @InterfaceStability.Evolving public interface ComponentMetrics { - void serializeDataTuple(String streamId, long latency); void emittedTuple(String streamId); + void addTupleToQueue(int size); } diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java index 84700d3..dfda1c5 100644 --- a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java +++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java @@ -52,6 +52,7 @@ public class FullBoltMetrics extends BoltMetrics { // Time in nano-seconds spending in execute() at every interval private final MultiCountMetric executeTimeNs; private final MultiCountMetric emitCount; + private final CountMetric tupleAddedToQueue; private final MultiCountMetric totalDeserializationTimeNs; private final MultiCountMetric totalSerializationTimeNs; private final MultiReducedMetric<MeanReducerState, Number, Double> averageSerializationTimeNs; @@ -61,7 +62,6 @@ public class FullBoltMetrics extends BoltMetrics { // so instance could not produce more tuples private final CountMetric outQueueFullCount; - public FullBoltMetrics() { ackCount = new MultiCountMetric(); processLatency = new MultiReducedMetric<>(new MeanReducer()); @@ -72,6 +72,7 @@ public class FullBoltMetrics extends BoltMetrics { executeTimeNs = new MultiCountMetric(); emitCount = new MultiCountMetric(); outQueueFullCount = new CountMetric(); + tupleAddedToQueue = new CountMetric(); totalDeserializationTimeNs = new MultiCountMetric(); totalSerializationTimeNs = new MultiCountMetric(); @@ -103,6 +104,8 @@ public class FullBoltMetrics extends BoltMetrics { "__av-tuple-deserialization-time-ns", totalDeserializationTimeNs, interval); topologyContext.registerMetric( "__av-tuple-serialization-time-ns", totalSerializationTimeNs, interval); + topologyContext.registerMetric("__data-tuple-added-to-outgoing-queue/default", + tupleAddedToQueue, interval); } // For MultiCountMetrics, we need to set the default value for all streams. @@ -179,6 +182,10 @@ public class FullBoltMetrics extends BoltMetrics { emitCount.scope(streamId).incr(); } + public void addTupleToQueue(int size) { + tupleAddedToQueue.incr(); + } + public void updateOutQueueFullCount() { outQueueFullCount.incr(); } diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java index 412b858..866bdf2 100644 --- a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java +++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java @@ -45,6 +45,7 @@ import org.apache.heron.common.utils.topology.TopologyContextImpl; public class FullSpoutMetrics extends SpoutMetrics { private final MultiCountMetric ackCount; + private final ReducedMetric<MeanReducerState, Number, Double> tupleSize; private final MultiReducedMetric<MeanReducerState, Number, Double> completeLatency; private final MultiReducedMetric<MeanReducerState, Number, Double> failLatency; private final MultiCountMetric failCount; @@ -53,7 +54,7 @@ public class FullSpoutMetrics extends SpoutMetrics { private final ReducedMetric<MeanReducerState, Number, Double> nextTupleLatency; private final CountMetric nextTupleCount; private final MultiCountMetric serializationTimeNs; - + private final CountMetric tupleAddedToQueue; // The # of times back-pressure happens on outStreamQueue so instance could not // produce more tuples private final CountMetric outQueueFullCount; @@ -73,6 +74,8 @@ public class FullSpoutMetrics extends SpoutMetrics { outQueueFullCount = new CountMetric(); pendingTuplesCount = new ReducedMetric<>(new MeanReducer()); serializationTimeNs = new MultiCountMetric(); + tupleAddedToQueue = new CountMetric(); + tupleSize = new ReducedMetric<>(new MeanReducer()); } public void registerMetrics(TopologyContextImpl topologyContext) { @@ -91,7 +94,17 @@ public class FullSpoutMetrics extends SpoutMetrics { topologyContext.registerMetric("__next-tuple-count", nextTupleCount, interval); topologyContext.registerMetric("__out-queue-full-count", outQueueFullCount, interval); topologyContext.registerMetric("__pending-acked-count", pendingTuplesCount, interval); - topologyContext.registerMetric("__tuple-serialization-time-ns", serializationTimeNs, interval); + topologyContext.registerMetric("__tuple-serialization-time-ns", serializationTimeNs, + interval); + + // The following metrics measure the rate at which tuples are added to the outgoing + // queues at spouts and the sizes of these queues. This allows us to measure whether + // the gateway thread pulls out tuples from the queue fast enough, thereby preventing + // the spout from becoming a bottleneck. + topologyContext.registerMetric("__data-tuple-added-to-outgoing-queue/default", + tupleAddedToQueue, interval); + topologyContext.registerMetric("__average-tuple-size-added-queue/default", + tupleSize, interval); } // For MultiCountMetrics, we need to set the default value for all streams. @@ -135,6 +148,11 @@ public class FullSpoutMetrics extends SpoutMetrics { nextTupleCount.incr(); } + public void addTupleToQueue(int size) { + tupleAddedToQueue.incr(); + tupleSize.update(size); + } + public void updateOutQueueFullCount() { outQueueFullCount.incr(); } diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java index 7ff67d0..cbda4c9 100644 --- a/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java +++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java @@ -47,6 +47,7 @@ public class SpoutMetrics implements ComponentMetrics { private final CountMetric emitCount; private final ReducedMetric<MeanReducerState, Number, Double> nextTupleLatency; private final CountMetric nextTupleCount; + private final CountMetric tupleAddedToQueue; // The # of times back-pressure happens on outStreamQueue so instance could not // produce more tuples @@ -66,6 +67,7 @@ public class SpoutMetrics implements ComponentMetrics { nextTupleCount = new CountMetric(); outQueueFullCount = new CountMetric(); pendingTuplesCount = new ReducedMetric<>(new MeanReducer()); + tupleAddedToQueue = new CountMetric(); } public void registerMetrics(TopologyContextImpl topologyContext) { @@ -84,6 +86,8 @@ public class SpoutMetrics implements ComponentMetrics { topologyContext.registerMetric("__next-tuple-count", nextTupleCount, interval); topologyContext.registerMetric("__out-queue-full-count", outQueueFullCount, interval); topologyContext.registerMetric("__pending-acked-count", pendingTuplesCount, interval); + topologyContext.registerMetric("__data-tuple-added-to-outgoing-queue/default", + tupleAddedToQueue, interval); } // For MultiCountMetrics, we need to set the default value for all streams. @@ -114,6 +118,10 @@ public class SpoutMetrics implements ComponentMetrics { emitCount.incr(); } + public void addTupleToQueue(int size) { + tupleAddedToQueue.incr(); + } + public void nextTuple(long latency) { nextTupleLatency.update(latency); nextTupleCount.incr(); diff --git a/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java b/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java index 08afa61..3e22ac8 100644 --- a/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java +++ b/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java @@ -81,7 +81,7 @@ public class AbstractOutputCollector { } } - this.outputter = new OutgoingTupleCollection(helper, streamOutQueue, lock); + this.outputter = new OutgoingTupleCollection(helper, streamOutQueue, lock, metrics); } public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) { diff --git a/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java b/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java index 862c300..a3c1e84 100644 --- a/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java +++ b/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java @@ -35,6 +35,7 @@ import org.apache.heron.common.basics.Communicator; import org.apache.heron.common.basics.FileUtils; import org.apache.heron.common.basics.SingletonRegistry; import org.apache.heron.common.config.SystemConfig; +import org.apache.heron.common.utils.metrics.ComponentMetrics; import org.apache.heron.common.utils.misc.PhysicalPlanHelper; import org.apache.heron.common.utils.misc.SerializeDeSerializeHelper; import org.apache.heron.proto.ckptmgr.CheckpointManager; @@ -71,12 +72,16 @@ public class OutgoingTupleCollection { private final ReentrantLock lock; + protected final ComponentMetrics metrics; + public OutgoingTupleCollection( PhysicalPlanHelper helper, Communicator<Message> outQueue, - ReentrantLock lock) { + ReentrantLock lock, + ComponentMetrics metrics) { this.outQueue = outQueue; this.helper = helper; + this.metrics = metrics; SystemConfig systemConfig = (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG); @@ -238,6 +243,7 @@ public class OutgoingTupleCollection { bldr.setData(currentDataTuple); pushTupleToQueue(bldr, outQueue); + metrics.addTupleToQueue(currentDataTuple.getTuplesCount()); currentDataTuple = null; } diff --git a/heron/instance/src/java/org/apache/heron/metrics/GatewayMetrics.java b/heron/instance/src/java/org/apache/heron/metrics/GatewayMetrics.java index 64b11bb..8bb40c3 100644 --- a/heron/instance/src/java/org/apache/heron/metrics/GatewayMetrics.java +++ b/heron/instance/src/java/org/apache/heron/metrics/GatewayMetrics.java @@ -46,7 +46,6 @@ public class GatewayMetrics { private final CountMetric sentMetricsSize; private final CountMetric sentMetricsCount; private final CountMetric sentExceptionsCount; - // The # of items in inStreamQueue private final ReducedMetric<MeanReducerState, Number, Double> inStreamQueueSize; // The # of items in outStreamQueue