Ethanlm commented on a change in pull request #3323:
URL: https://github.com/apache/storm/pull/3323#discussion_r469678349



##########
File path: storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
##########
@@ -13,69 +13,151 @@
 package org.apache.storm.metrics2;
 
 import com.codahale.metrics.Counter;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.ConfigUtils;
 
 public class TaskMetrics {
-    private static final String METRIC_NAME_ACKED = "acked";
-    private static final String METRIC_NAME_FAILED = "failed";
-    private static final String METRIC_NAME_EMITTED = "emitted";
-    private static final String METRIC_NAME_TRANSFERRED = "transferred";
+    private static final String METRIC_NAME_ACKED = "__ack-count";
+    private static final String METRIC_NAME_FAILED = "__fail-count";
+    private static final String METRIC_NAME_EMITTED = "__emit-count";
+    private static final String METRIC_NAME_TRANSFERRED = "__transfer-count";
+    private static final String METRIC_NAME_EXECUTED = "__execute-count";
+    private static final String METRIC_NAME_PROCESS_LATENCY = 
"__process-latency";
+    private static final String METRIC_NAME_COMPLETE_LATENCY = 
"__complete-latency";
+    private static final String METRIC_NAME_EXECUTE_LATENCY = 
"__execute-latency";
 
-    private final ConcurrentMap<String, Counter> ackedByStream = new 
ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> failedByStream = new 
ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> emittedByStream = new 
ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> transferredByStream = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Counter> counters = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<String, ResettingAverageGauge> gauges = new 
ConcurrentHashMap<>();
 
     private final String topologyId;
     private final String componentId;
     private final Integer taskId;
     private final Integer workerPort;
     private final StormMetricRegistry metricRegistry;
+    private final int samplingRate;
 
-    public TaskMetrics(WorkerTopologyContext context, String componentId, 
Integer taskid, StormMetricRegistry metricRegistry) {
+
+    public TaskMetrics(WorkerTopologyContext context, String componentId, 
Integer taskid,
+                       StormMetricRegistry metricRegistry, Map<String, Object> 
topoConf) {
         this.metricRegistry = metricRegistry;
         this.topologyId = context.getStormId();
         this.componentId = componentId;
         this.taskId = taskid;
         this.workerPort = context.getThisWorkerPort();
+        this.samplingRate = ConfigUtils.samplingRate(topoConf);
+    }
+
+    public void spoutAckedTuple(String streamId, long latencyMs) {
+        String metricName = METRIC_NAME_ACKED + "-" + streamId;
+        Counter c = this.counters.get(metricName);
+        if (c == null) {

Review comment:
       I am not very sure about spoutAckedTuple at this moment. But methods 
like `boltFailedTuple` and `boltAckedTuple` might be invoked in multiple 
threads if the bolt is a multi-threaded bolt




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to