Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2203#discussion_r161326958
--- Diff:
storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+ private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+ private static final List<StormReporter> REPORTERS = new ArrayList<>();
+
+ private static String hostName = null;
+
+ public static <T> SimpleGauge<T> gauge(T initialValue, String name,
String topologyId, String componentId, Integer port){
+ String metricName = metricName(name, topologyId, componentId,
port);
+ if(REGISTRY.getGauges().containsKey(metricName)){
+ return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+ } else {
+ return REGISTRY.register(metricName, new
SimpleGauge<>(initialValue));
+ }
+ }
+
+ public static DisruptorMetrics disruptorMetrics(String name, String
topologyId, String componentId, Integer port){
+ return new DisruptorMetrics(
+ StormMetricRegistry.gauge(0L, name + "-capacity",
topologyId, componentId, port),
+ StormMetricRegistry.gauge(0L, name + "-population",
topologyId, componentId, port),
+ StormMetricRegistry.gauge(0L, name + "-write-position",
topologyId, componentId, port),
+ StormMetricRegistry.gauge(0L, name + "-read-position",
topologyId, componentId, port),
+ StormMetricRegistry.gauge(0.0, name + "-arrival-rate",
topologyId, componentId, port),
+ StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms",
topologyId, componentId, port),
+ StormMetricRegistry.gauge(0L, name + "-overflow",
topologyId, componentId, port),
+ StormMetricRegistry.gauge(0.0F, name + "-percent-full",
topologyId, componentId, port)
+ );
+ }
+
+ public static Meter meter(String name, WorkerTopologyContext context,
String componentId, Integer taskId, String streamId){
+ String metricName = metricName(name, context.getStormId(),
componentId, streamId,taskId, context.getThisWorkerPort());
+ return REGISTRY.meter(metricName);
+ }
+
+ public static Counter counter(String name, WorkerTopologyContext
context, String componentId, Integer taskId, String streamId){
+ String metricName = metricName(name, context.getStormId(),
componentId, streamId,taskId, context.getThisWorkerPort());
+ return REGISTRY.counter(metricName);
+ }
+
+ public static Counter counter(String name, String topologyId, String
componentId, Integer taskId, Integer workerPort, String streamId){
+ String metricName = metricName(name, topologyId, componentId,
streamId,taskId, workerPort);
+ return REGISTRY.counter(metricName);
+ }
+
+ public static void start(Map<String, Object> stormConfig, DaemonType
type){
+ try {
+ hostName = dotToUnderScore(Utils.localHostname());
+ } catch (UnknownHostException e) {
+ LOG.warn("Unable to determine hostname while starting the
metrics system. Hostname will be reported" +
+ " as 'localhost'.");
+ }
+
+ LOG.info("Starting metrics reporters...");
+ List<Map<String, Object>> reporterList = (List<Map<String,
Object>>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
+ if(reporterList != null && reporterList.size() > 0) {
+ for (Map<String, Object> reporterConfig : reporterList) {
+ // only start those requested
+ List<String> daemons = (List<String>)
reporterConfig.get("daemons");
+ for (String daemon : daemons) {
+ if (DaemonType.valueOf(daemon.toUpperCase()) == type) {
+ startReporter(stormConfig, reporterConfig);
+ }
+ }
+ }
+ }
+ }
+
+ public static MetricRegistry registry(){
+ return REGISTRY;
+ }
+
+ private static void startReporter(Map<String, Object> stormConfig,
Map<String, Object> reporterConfig){
+ String clazz = (String)reporterConfig.get("class");
+ StormReporter reporter = null;
+ LOG.info("Attempting to instantiate reporter class: {}", clazz);
+ reporter = Utils.newInstance(clazz);
+ if(reporter != null){
+ reporter.prepare(REGISTRY, stormConfig, reporterConfig);
+ reporter.start();
+ REPORTERS.add(reporter);
+ }
+
+ }
+
+ public static void stop(){
+ for(StormReporter sr : REPORTERS){
+ sr.stop();
+ }
+ }
+
+ public static String metricName(String name, String stormId, String
componentId, String streamId, Integer taskId, Integer workerPort){
+ StringBuilder sb = new StringBuilder("storm.worker.");
+ sb.append(stormId);
+ sb.append(".");
+ sb.append(hostName);
+ sb.append(".");
+ sb.append(dotToUnderScore(componentId));
+ sb.append(".");
+ sb.append(dotToUnderScore(streamId));
+ sb.append(".");
+ sb.append(taskId);
+ sb.append(".");
+ sb.append(workerPort);
+ sb.append("-");
+ sb.append(name);
+ return sb.toString();
+ }
+
+ public static String metricName(String name, String stormId, String
componentId, Integer workerPort) {
+ StringBuilder sb = new StringBuilder("storm.worker.");
+ sb.append(stormId);
+ sb.append(".");
+ sb.append(hostName);
+ sb.append(".");
+ sb.append(dotToUnderScore(componentId));
--- End diff --
The worker transfer queue should have a -1 task id, and the component id
should be __system, just like the system bolt. This way it is consistent for
worker level metrics.
---