agresch commented on a change in pull request #3323:
URL: https://github.com/apache/storm/pull/3323#discussion_r470013538



##########
File path: storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
##########
@@ -13,69 +13,151 @@
 package org.apache.storm.metrics2;
 
 import com.codahale.metrics.Counter;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.ConfigUtils;
 
 public class TaskMetrics {
-    private static final String METRIC_NAME_ACKED = "acked";
-    private static final String METRIC_NAME_FAILED = "failed";
-    private static final String METRIC_NAME_EMITTED = "emitted";
-    private static final String METRIC_NAME_TRANSFERRED = "transferred";
+    private static final String METRIC_NAME_ACKED = "__ack-count";
+    private static final String METRIC_NAME_FAILED = "__fail-count";
+    private static final String METRIC_NAME_EMITTED = "__emit-count";
+    private static final String METRIC_NAME_TRANSFERRED = "__transfer-count";
+    private static final String METRIC_NAME_EXECUTED = "__execute-count";
+    private static final String METRIC_NAME_PROCESS_LATENCY = 
"__process-latency";
+    private static final String METRIC_NAME_COMPLETE_LATENCY = 
"__complete-latency";
+    private static final String METRIC_NAME_EXECUTE_LATENCY = 
"__execute-latency";
 
-    private final ConcurrentMap<String, Counter> ackedByStream = new 
ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> failedByStream = new 
ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> emittedByStream = new 
ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> transferredByStream = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Counter> counters = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<String, ResettingAverageGauge> gauges = new 
ConcurrentHashMap<>();
 
     private final String topologyId;
     private final String componentId;
     private final Integer taskId;
     private final Integer workerPort;
     private final StormMetricRegistry metricRegistry;
+    private final int samplingRate;
 
-    public TaskMetrics(WorkerTopologyContext context, String componentId, 
Integer taskid, StormMetricRegistry metricRegistry) {
+
+    public TaskMetrics(WorkerTopologyContext context, String componentId, 
Integer taskid,
+                       StormMetricRegistry metricRegistry, Map<String, Object> 
topoConf) {
         this.metricRegistry = metricRegistry;
         this.topologyId = context.getStormId();
         this.componentId = componentId;
         this.taskId = taskid;
         this.workerPort = context.getThisWorkerPort();
+        this.samplingRate = ConfigUtils.samplingRate(topoConf);
+    }
+
+    public void spoutAckedTuple(String streamId, long latencyMs) {
+        String metricName = METRIC_NAME_ACKED + "-" + streamId;
+        Counter c = this.counters.get(metricName);
+        if (c == null) {

Review comment:
       ok, will address this 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to