Ethanlm commented on a change in pull request #3251:
URL: https://github.com/apache/storm/pull/3251#discussion_r413811477



##########
File path: 
storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
##########
@@ -35,38 +45,154 @@
     
     private final MetricRegistry registry = new MetricRegistry();
     private final List<StormReporter> reporters = new ArrayList<>();
+    private final ConcurrentMap<Integer, Map<String, Metric>> taskIdMetrics = 
new ConcurrentHashMap<>();
     private String hostName = null;
 
     public <T> SimpleGauge<T> gauge(
         T initialValue, String name, String topologyId, String componentId, 
Integer taskId, Integer port) {
         String metricName = metricName(name, topologyId, componentId, taskId, 
port);
-        return (SimpleGauge<T>) registry.gauge(metricName, () -> new 
SimpleGauge<>(initialValue));
+        SimpleGauge<T> gauge = (SimpleGauge<T>) registry.gauge(metricName, () 
-> new SimpleGauge<>(initialValue));
+        saveMetricTaskIdMapping(taskId, metricName, gauge);
+        return gauge;
+    }
+
+    public <T> Gauge<T> gauge(String name, Gauge<T> gauge, TopologyContext 
context) {
+        String metricName = metricName(name, context);
+        gauge = registry.register(metricName, gauge);
+        saveMetricTaskIdMapping(context.getThisTaskId(), metricName, gauge);
+        return gauge;
     }
 
     public JcMetrics jcMetrics(String name, String topologyId, String 
componentId, Integer taskId, Integer port) {
-        return new JcMetrics(
-            gauge(0L, name + "-capacity", topologyId, componentId, taskId, 
port),
-            gauge(0L, name + "-population", topologyId, componentId, taskId, 
port)
-        );
+        SimpleGauge<Long> capacityGauge = gauge(0L, name + "-capacity", 
topologyId, componentId, taskId, port);
+        SimpleGauge<Long> populationGauge = gauge(0L, name + "-population", 
topologyId, componentId, taskId, port);
+        return new JcMetrics(capacityGauge, populationGauge);
     }
 
     public Meter meter(String name, WorkerTopologyContext context, String 
componentId, Integer taskId, String streamId) {
         String metricName = metricName(name, context.getStormId(), 
componentId, streamId, taskId, context.getThisWorkerPort());
-        return registry.meter(metricName);
+        Meter meter = registry.meter(metricName);
+        saveMetricTaskIdMapping(taskId, metricName, meter);
+        return meter;
+    }
+
+    public Meter meter(String name, TopologyContext context) {
+        String metricName = metricName(name, context);
+        Meter meter = registry.meter(metricName);
+        saveMetricTaskIdMapping(context.getThisTaskId(), metricName, meter);
+        return meter;
     }
 
     public Counter counter(String name, WorkerTopologyContext context, String 
componentId, Integer taskId, String streamId) {
         String metricName = metricName(name, context.getStormId(), 
componentId, streamId, taskId, context.getThisWorkerPort());
-        return registry.counter(metricName);
+        Counter counter = registry.counter(metricName);
+        saveMetricTaskIdMapping(taskId, metricName, counter);
+        return counter;
     }
 
     public Counter counter(String name, String topologyId, String componentId, 
Integer taskId, Integer workerPort, String streamId) {
         String metricName = metricName(name, topologyId, componentId, 
streamId, taskId, workerPort);
-        return registry.counter(metricName);
+        Counter counter = registry.counter(metricName);
+        saveMetricTaskIdMapping(taskId, metricName, counter);
+        return counter;
     }
-    
-    public MetricRegistry registry() {
-        return registry;
+
+    public Counter counter(String name, TopologyContext context) {
+        String metricName = metricName(name, context);
+        Counter counter = registry.counter(metricName);
+        saveMetricTaskIdMapping(context.getThisTaskId(), metricName, counter);
+        return counter;
+    }
+
+    public void metricSet(String prefix, MetricSet set, TopologyContext 
context) {
+        String baseName = metricName(prefix, context);
+        // Instead of registering the metrics as a set, register them 
individually.
+        // This allows fetching the individual metrics by type 
(getTaskGauges())
+        // to work as expected.
+        for (Map.Entry<String, Metric> entry : set.getMetrics().entrySet()) {
+            String metricName = baseName + "." + entry.getKey();
+            Metric metric = registry.register(metricName, entry.getValue());
+            saveMetricTaskIdMapping(context.getThisTaskId(), metricName, 
metric);
+        }
+    }
+
+    public Timer timer(String name, TopologyContext context) {
+        String metricName = metricName(name, context);
+        Timer timer = registry.timer(metricName);
+        saveMetricTaskIdMapping(context.getThisTaskId(), metricName, timer);
+        return timer;
+    }
+
+    public Histogram histogram(String name, TopologyContext context) {
+        String metricName = metricName(name, context);
+        Histogram histogram = registry.histogram(metricName);
+        saveMetricTaskIdMapping(context.getThisTaskId(), metricName, 
histogram);
+        return histogram;
+    }
+
+    private void saveMetricTaskIdMapping(Integer taskId, String name, Metric 
metric) {
+        Map<String, Metric> metrics = taskIdMetrics.computeIfAbsent(taskId, 
(tid) -> new HashMap<>());
+        metrics.put(name, metric);
+    }
+
+    public Map<String, Gauge> getTaskGauges(int taskId) {
+        Map<String, Gauge> taskGaugeMap = new HashMap<>();
+        Collection<Gauge> gauges = registry.getGauges().values();
+        Map<String, Metric> taskMetrics = taskIdMetrics.getOrDefault(taskId, 
Collections.emptyMap());
+        for (Map.Entry<String, Metric> entry : taskMetrics.entrySet()) {
+            if (gauges.contains(entry.getValue())) {

Review comment:
       instead of going through every metric and check if it's a gauge, I think 
we probably just need separate maps for different metric types.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to