This is an automated email from the ASF dual-hosted git repository.

karthikz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new d669d06  Metrics added to measure the rate at which tuples are added 
to the queue at spout gateway (#2981)
d669d06 is described below

commit d669d0615198ce36a8cdebadd184ba4c3bcc9e49
Author: Faria Kalim <faria.ka...@gmail.com>
AuthorDate: Fri Aug 17 08:37:51 2018 -0700

    Metrics added to measure the rate at which tuples are added to the queue at 
spout gateway (#2981)
    
    * added metrics to measure tuples added to outgoing queue per min
    
    * added metric to full metrics instance
    
    * added cumulative outgoing queue size
    
    * measuring number of tuples per tupleset
    
    * added comment justifying use of metrics
    
    * removed extra comment
---
 .../heron/common/utils/metrics/BoltMetrics.java    | 10 +++++++++-
 .../common/utils/metrics/ComponentMetrics.java     |  2 +-
 .../common/utils/metrics/FullBoltMetrics.java      |  9 ++++++++-
 .../common/utils/metrics/FullSpoutMetrics.java     | 22 ++++++++++++++++++++--
 .../heron/common/utils/metrics/SpoutMetrics.java   |  8 ++++++++
 .../heron/instance/AbstractOutputCollector.java    |  2 +-
 .../heron/instance/OutgoingTupleCollection.java    |  8 +++++++-
 .../org/apache/heron/metrics/GatewayMetrics.java   |  1 -
 8 files changed, 54 insertions(+), 8 deletions(-)

diff --git 
a/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java 
b/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java
index 13006fb..bd40180 100644
--- 
a/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java
+++ 
b/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java
@@ -44,7 +44,7 @@ public class BoltMetrics implements ComponentMetrics {
   private final CountMetric failCount;
   private final CountMetric executeCount;
   private final ReducedMetric<MeanReducerState, Number, Double> executeLatency;
-
+  private final CountMetric tupleAddedToQueue;
   // Time in nano-seconds spending in execute() at every interval
   private final CountMetric emitCount;
 
@@ -53,6 +53,7 @@ public class BoltMetrics implements ComponentMetrics {
   private final CountMetric outQueueFullCount;
 
 
+
   public BoltMetrics() {
     ackCount = new CountMetric();
     processLatency = new ReducedMetric<>(new MeanReducer());
@@ -62,6 +63,7 @@ public class BoltMetrics implements ComponentMetrics {
     executeLatency = new ReducedMetric<>(new MeanReducer());
     emitCount = new CountMetric();
     outQueueFullCount = new CountMetric();
+    tupleAddedToQueue = new CountMetric();
   }
 
   public void registerMetrics(TopologyContextImpl topologyContext) {
@@ -78,6 +80,8 @@ public class BoltMetrics implements ComponentMetrics {
     topologyContext.registerMetric("__execute-latency/default", 
executeLatency, interval);
     topologyContext.registerMetric("__emit-count/default", emitCount, 
interval);
     topologyContext.registerMetric("__out-queue-full-count", 
outQueueFullCount, interval);
+    
topologyContext.registerMetric("__data-tuple-added-to-outgoing-queue/default",
+        tupleAddedToQueue, interval);
   }
 
   // For MultiCountMetrics, we need to set the default value for all streams.
@@ -109,6 +113,10 @@ public class BoltMetrics implements ComponentMetrics {
     emitCount.incr();
   }
 
+  public void addTupleToQueue(int size) {
+    tupleAddedToQueue.incr();
+  }
+
   public void updateOutQueueFullCount() {
     outQueueFullCount.incr();
   }
diff --git 
a/heron/common/src/java/org/apache/heron/common/utils/metrics/ComponentMetrics.java
 
b/heron/common/src/java/org/apache/heron/common/utils/metrics/ComponentMetrics.java
index 81728f6..d9e3b23 100644
--- 
a/heron/common/src/java/org/apache/heron/common/utils/metrics/ComponentMetrics.java
+++ 
b/heron/common/src/java/org/apache/heron/common/utils/metrics/ComponentMetrics.java
@@ -28,7 +28,7 @@ import org.apache.heron.classification.InterfaceStability;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public interface ComponentMetrics {
-
   void serializeDataTuple(String streamId, long latency);
   void emittedTuple(String streamId);
+  void addTupleToQueue(int size);
 }
diff --git 
a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
 
b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
index 84700d3..dfda1c5 100644
--- 
a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
+++ 
b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
@@ -52,6 +52,7 @@ public class FullBoltMetrics extends BoltMetrics {
   // Time in nano-seconds spending in execute() at every interval
   private final MultiCountMetric executeTimeNs;
   private final MultiCountMetric emitCount;
+  private final CountMetric tupleAddedToQueue;
   private final MultiCountMetric totalDeserializationTimeNs;
   private final MultiCountMetric totalSerializationTimeNs;
   private final MultiReducedMetric<MeanReducerState, Number, Double> 
averageSerializationTimeNs;
@@ -61,7 +62,6 @@ public class FullBoltMetrics extends BoltMetrics {
   // so instance could not produce more tuples
   private final CountMetric outQueueFullCount;
 
-
   public FullBoltMetrics() {
     ackCount = new MultiCountMetric();
     processLatency = new MultiReducedMetric<>(new MeanReducer());
@@ -72,6 +72,7 @@ public class FullBoltMetrics extends BoltMetrics {
     executeTimeNs = new MultiCountMetric();
     emitCount = new MultiCountMetric();
     outQueueFullCount = new CountMetric();
+    tupleAddedToQueue = new CountMetric();
 
     totalDeserializationTimeNs = new MultiCountMetric();
     totalSerializationTimeNs = new MultiCountMetric();
@@ -103,6 +104,8 @@ public class FullBoltMetrics extends BoltMetrics {
         "__av-tuple-deserialization-time-ns", totalDeserializationTimeNs, 
interval);
     topologyContext.registerMetric(
         "__av-tuple-serialization-time-ns", totalSerializationTimeNs, 
interval);
+    
topologyContext.registerMetric("__data-tuple-added-to-outgoing-queue/default",
+        tupleAddedToQueue, interval);
   }
 
   // For MultiCountMetrics, we need to set the default value for all streams.
@@ -179,6 +182,10 @@ public class FullBoltMetrics extends BoltMetrics {
     emitCount.scope(streamId).incr();
   }
 
+  public void addTupleToQueue(int size) {
+    tupleAddedToQueue.incr();
+  }
+
   public void updateOutQueueFullCount() {
     outQueueFullCount.incr();
   }
diff --git 
a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java
 
b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java
index 412b858..866bdf2 100644
--- 
a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java
+++ 
b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java
@@ -45,6 +45,7 @@ import 
org.apache.heron.common.utils.topology.TopologyContextImpl;
 
 public class FullSpoutMetrics extends SpoutMetrics {
   private final MultiCountMetric ackCount;
+  private final ReducedMetric<MeanReducerState, Number, Double> tupleSize;
   private final MultiReducedMetric<MeanReducerState, Number, Double> 
completeLatency;
   private final MultiReducedMetric<MeanReducerState, Number, Double> 
failLatency;
   private final MultiCountMetric failCount;
@@ -53,7 +54,7 @@ public class FullSpoutMetrics extends SpoutMetrics {
   private final ReducedMetric<MeanReducerState, Number, Double> 
nextTupleLatency;
   private final CountMetric nextTupleCount;
   private final MultiCountMetric serializationTimeNs;
-
+  private final CountMetric tupleAddedToQueue;
   // The # of times back-pressure happens on outStreamQueue so instance could 
not
   // produce more tuples
   private final CountMetric outQueueFullCount;
@@ -73,6 +74,8 @@ public class FullSpoutMetrics extends SpoutMetrics {
     outQueueFullCount = new CountMetric();
     pendingTuplesCount = new ReducedMetric<>(new MeanReducer());
     serializationTimeNs = new MultiCountMetric();
+    tupleAddedToQueue = new CountMetric();
+    tupleSize = new ReducedMetric<>(new MeanReducer());
   }
 
   public void registerMetrics(TopologyContextImpl topologyContext) {
@@ -91,7 +94,17 @@ public class FullSpoutMetrics extends SpoutMetrics {
     topologyContext.registerMetric("__next-tuple-count", nextTupleCount, 
interval);
     topologyContext.registerMetric("__out-queue-full-count", 
outQueueFullCount, interval);
     topologyContext.registerMetric("__pending-acked-count", 
pendingTuplesCount, interval);
-    topologyContext.registerMetric("__tuple-serialization-time-ns", 
serializationTimeNs, interval);
+    topologyContext.registerMetric("__tuple-serialization-time-ns", 
serializationTimeNs,
+        interval);
+
+    // The following metrics measure the rate at which tuples are added to the 
outgoing
+    // queues at spouts and the sizes of these queues. This allows us to 
measure whether
+    // the gateway thread pulls out tuples from the queue fast enough, thereby 
preventing
+    // the spout from becoming a bottleneck.
+    
topologyContext.registerMetric("__data-tuple-added-to-outgoing-queue/default",
+        tupleAddedToQueue, interval);
+    topologyContext.registerMetric("__average-tuple-size-added-queue/default",
+        tupleSize, interval);
   }
 
   // For MultiCountMetrics, we need to set the default value for all streams.
@@ -135,6 +148,11 @@ public class FullSpoutMetrics extends SpoutMetrics {
     nextTupleCount.incr();
   }
 
+  public void addTupleToQueue(int size) {
+    tupleAddedToQueue.incr();
+    tupleSize.update(size);
+  }
+
   public void updateOutQueueFullCount() {
     outQueueFullCount.incr();
   }
diff --git 
a/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java 
b/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java
index 7ff67d0..cbda4c9 100644
--- 
a/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java
+++ 
b/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java
@@ -47,6 +47,7 @@ public class SpoutMetrics implements ComponentMetrics {
   private final CountMetric emitCount;
   private final ReducedMetric<MeanReducerState, Number, Double> 
nextTupleLatency;
   private final CountMetric nextTupleCount;
+  private final CountMetric tupleAddedToQueue;
 
   // The # of times back-pressure happens on outStreamQueue so instance could 
not
   // produce more tuples
@@ -66,6 +67,7 @@ public class SpoutMetrics implements ComponentMetrics {
     nextTupleCount = new CountMetric();
     outQueueFullCount = new CountMetric();
     pendingTuplesCount = new ReducedMetric<>(new MeanReducer());
+    tupleAddedToQueue = new CountMetric();
   }
 
   public void registerMetrics(TopologyContextImpl topologyContext) {
@@ -84,6 +86,8 @@ public class SpoutMetrics implements ComponentMetrics {
     topologyContext.registerMetric("__next-tuple-count", nextTupleCount, 
interval);
     topologyContext.registerMetric("__out-queue-full-count", 
outQueueFullCount, interval);
     topologyContext.registerMetric("__pending-acked-count", 
pendingTuplesCount, interval);
+    
topologyContext.registerMetric("__data-tuple-added-to-outgoing-queue/default",
+        tupleAddedToQueue, interval);
   }
 
   // For MultiCountMetrics, we need to set the default value for all streams.
@@ -114,6 +118,10 @@ public class SpoutMetrics implements ComponentMetrics {
     emitCount.incr();
   }
 
+  public void addTupleToQueue(int size) {
+    tupleAddedToQueue.incr();
+  }
+
   public void nextTuple(long latency) {
     nextTupleLatency.update(latency);
     nextTupleCount.incr();
diff --git 
a/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
 
b/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
index 08afa61..3e22ac8 100644
--- 
a/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
+++ 
b/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
@@ -81,7 +81,7 @@ public class AbstractOutputCollector {
       }
     }
 
-    this.outputter = new OutgoingTupleCollection(helper, streamOutQueue, lock);
+    this.outputter = new OutgoingTupleCollection(helper, streamOutQueue, lock, 
metrics);
   }
 
   public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
diff --git 
a/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
 
b/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
index 862c300..a3c1e84 100644
--- 
a/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
+++ 
b/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
@@ -35,6 +35,7 @@ import org.apache.heron.common.basics.Communicator;
 import org.apache.heron.common.basics.FileUtils;
 import org.apache.heron.common.basics.SingletonRegistry;
 import org.apache.heron.common.config.SystemConfig;
+import org.apache.heron.common.utils.metrics.ComponentMetrics;
 import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
 import org.apache.heron.common.utils.misc.SerializeDeSerializeHelper;
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
@@ -71,12 +72,16 @@ public class OutgoingTupleCollection {
 
   private final ReentrantLock lock;
 
+  protected final ComponentMetrics metrics;
+
   public OutgoingTupleCollection(
       PhysicalPlanHelper helper,
       Communicator<Message> outQueue,
-      ReentrantLock lock) {
+      ReentrantLock lock,
+      ComponentMetrics metrics) {
     this.outQueue = outQueue;
     this.helper = helper;
+    this.metrics = metrics;
     SystemConfig systemConfig =
         (SystemConfig) 
SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
 
@@ -238,6 +243,7 @@ public class OutgoingTupleCollection {
       bldr.setData(currentDataTuple);
 
       pushTupleToQueue(bldr, outQueue);
+      metrics.addTupleToQueue(currentDataTuple.getTuplesCount());
 
       currentDataTuple = null;
     }
diff --git 
a/heron/instance/src/java/org/apache/heron/metrics/GatewayMetrics.java 
b/heron/instance/src/java/org/apache/heron/metrics/GatewayMetrics.java
index 64b11bb..8bb40c3 100644
--- a/heron/instance/src/java/org/apache/heron/metrics/GatewayMetrics.java
+++ b/heron/instance/src/java/org/apache/heron/metrics/GatewayMetrics.java
@@ -46,7 +46,6 @@ public class GatewayMetrics {
   private final CountMetric sentMetricsSize;
   private final CountMetric sentMetricsCount;
   private final CountMetric sentExceptionsCount;
-
   // The # of items in inStreamQueue
   private final ReducedMetric<MeanReducerState, Number, Double> 
inStreamQueueSize;
   // The # of items in outStreamQueue

Reply via email to