Ethanlm commented on a change in pull request #3329:
URL: https://github.com/apache/storm/pull/3329#discussion_r490535527
##########
File path:
storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
##########
@@ -181,62 +178,52 @@ public void metricSet(String prefix, MetricSet set,
TopologyContext context) {
private static <T extends Metric> void saveMetricTaskIdMapping(Integer
taskId, MetricNames names, T metric, Map<Integer,
Map<String, T>> taskIdMetrics) {
Map<String, T> metrics = taskIdMetrics.computeIfAbsent(taskId, (tid)
-> new HashMap<>());
- if (metrics.get(names.getV2TickName()) != null) {
+ if (metrics.get(names.getShortName()) != null) {
Review comment:
Looks like this can be removed too since if there is name collision, the
code in `registry.register` will throw `IllegalArgumentException`
##########
File path:
storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
##########
@@ -50,125 +50,182 @@
private final ConcurrentMap<Integer, Map<String, Counter>> taskIdCounters
= new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Map<String, Timer>> taskIdTimers =
new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Map<String, Histogram>>
taskIdHistograms = new ConcurrentHashMap<>();
+ private final ConcurrentMap<TaskMetricDimensions, TaskMetricRepo>
taskMetrics = new ConcurrentHashMap<>();
private String hostName = null;
+ private int port = -1;
+ private String topologyId = null;
public <T> SimpleGauge<T> gauge(
T initialValue, String name, String topologyId, String componentId,
Integer taskId, Integer port) {
+ Gauge gauge = new SimpleGauge<>(initialValue);
MetricNames metricNames = workerMetricName(name, topologyId,
componentId, taskId, port);
- Gauge gauge = registry.gauge(metricNames.getLongName(), () -> new
SimpleGauge<>(initialValue));
+ gauge = registerGauge(metricNames, gauge, taskId, componentId, null);
saveMetricTaskIdMapping(taskId, metricNames, gauge, taskIdGauges);
return (SimpleGauge<T>) gauge;
}
public <T> Gauge<T> gauge(String name, Gauge<T> gauge, TopologyContext
context) {
MetricNames metricNames = topologyMetricName(name, context);
- gauge = registry.register(metricNames.getLongName(), gauge);
+ gauge = registerGauge(metricNames, gauge, context.getThisTaskId(),
context.getThisComponentId(), null);
saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, gauge,
taskIdGauges);
return gauge;
}
public <T> Gauge<T> gauge(String name, Gauge<T> gauge, String topologyId,
String componentId, Integer taskId, Integer port) {
MetricNames metricNames = workerMetricName(name, topologyId,
componentId, taskId, port);
- gauge = registry.register(metricNames.getLongName(), gauge);
+ gauge = registerGauge(metricNames, gauge, taskId, componentId, null);
saveMetricTaskIdMapping(taskId, metricNames, gauge, taskIdGauges);
return gauge;
}
public <T> Gauge<T> gauge(String name, Gauge<T> gauge, String topologyId,
String componentId,
String streamId, Integer taskId, Integer port) {
MetricNames metricNames = workerMetricName(name, topologyId,
componentId, streamId, taskId, port);
- gauge = registry.register(metricNames.getLongName(), gauge);
+ gauge = registerGauge(metricNames, gauge, taskId, componentId,
streamId);
saveMetricTaskIdMapping(taskId, metricNames, gauge, taskIdGauges);
return gauge;
}
public Meter meter(String name, WorkerTopologyContext context, String
componentId, Integer taskId, String streamId) {
MetricNames metricNames = workerMetricName(name, context.getStormId(),
componentId, streamId, taskId, context.getThisWorkerPort());
- Meter meter = registry.meter(metricNames.getLongName());
+ Meter meter = registerMeter(metricNames, new Meter(), taskId,
componentId, streamId);
saveMetricTaskIdMapping(taskId, metricNames, meter, taskIdMeters);
return meter;
}
public Meter meter(String name, WorkerTopologyContext context, String
componentId, Integer taskId) {
MetricNames metricNames = workerMetricName(name, context.getStormId(),
componentId, taskId, context.getThisWorkerPort());
- Meter meter = registry.meter(metricNames.getLongName());
+ Meter meter = registerMeter(metricNames, new Meter(), taskId,
componentId, null);
saveMetricTaskIdMapping(taskId, metricNames, meter, taskIdMeters);
return meter;
}
public Meter meter(String name, TopologyContext context) {
MetricNames metricNames = topologyMetricName(name, context);
- Meter meter = registry.meter(metricNames.getLongName());
+ Meter meter = registerMeter(metricNames, new Meter(),
context.getThisTaskId(), context.getThisComponentId(), null);
saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, meter,
taskIdMeters);
return meter;
}
public Counter counter(String name, WorkerTopologyContext context, String
componentId, Integer taskId, String streamId) {
MetricNames metricNames = workerMetricName(name, context.getStormId(),
componentId, streamId, taskId, context.getThisWorkerPort());
- Counter counter = registry.counter(metricNames.getLongName());
+ Counter counter = registerCounter(metricNames, new Counter(), taskId,
componentId, streamId);
saveMetricTaskIdMapping(taskId, metricNames, counter, taskIdCounters);
return counter;
}
public Counter counter(String name, String topologyId, String componentId,
Integer taskId, Integer workerPort, String streamId) {
MetricNames metricNames = workerMetricName(name, topologyId,
componentId, streamId, taskId, workerPort);
- Counter counter = registry.counter(metricNames.getLongName());
+ Counter counter = registerCounter(metricNames, new Counter(), taskId,
componentId, streamId);
saveMetricTaskIdMapping(taskId, metricNames, counter, taskIdCounters);
return counter;
}
public Counter counter(String name, TopologyContext context) {
MetricNames metricNames = topologyMetricName(name, context);
- Counter counter = registry.counter(metricNames.getLongName());
+ Counter counter = registerCounter(metricNames, new Counter(),
context.getThisTaskId(), context.getThisComponentId(), null);
saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, counter,
taskIdCounters);
return counter;
}
+ public Timer timer(String name, TopologyContext context) {
+ MetricNames metricNames = topologyMetricName(name, context);
+ Timer timer = registerTimer(metricNames, new Timer(),
context.getThisTaskId(), context.getThisComponentId(), null);
+ saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, timer,
taskIdTimers);
+ return timer;
+ }
+
+ public Histogram histogram(String name, TopologyContext context) {
+ MetricNames metricNames = topologyMetricName(name, context);
+ Histogram histogram = registerHistogram(metricNames, new Histogram(new
ExponentiallyDecayingReservoir()),
+ context.getThisTaskId(), context.getThisComponentId(), null);
+ saveMetricTaskIdMapping(context.getThisTaskId(), metricNames,
histogram, taskIdHistograms);
+ return histogram;
+ }
+
public void metricSet(String prefix, MetricSet set, TopologyContext
context) {
// Instead of registering the metrics as a set, register them
individually.
// This allows fetching the individual metrics by type
(getTaskGauges())
// to work as expected.
+ TaskMetricDimensions taskMetricDimensions = new
TaskMetricDimensions(context.getThisTaskId(),
Review comment:
looks like this line can be removed
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]