STORM-2153: eliminate string concatenation when looking up metrics
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d3c00ee7 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d3c00ee7 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d3c00ee7 Branch: refs/heads/1.x-branch Commit: d3c00ee7705b2d7b1bba4afd1146fb4c258a471d Parents: 8d53800 Author: P. Taylor Goetz <[email protected]> Authored: Thu Jan 11 15:39:37 2018 -0500 Committer: P. Taylor Goetz <[email protected]> Committed: Thu Jan 11 15:39:37 2018 -0500 ---------------------------------------------------------------------- .../clj/org/apache/storm/daemon/executor.clj | 11 +-- .../src/clj/org/apache/storm/daemon/task.clj | 12 ++-- .../storm/metrics2/StormMetricRegistry.java | 5 ++ .../org/apache/storm/metrics2/TaskMetrics.java | 72 ++++++++++++++++++++ 4 files changed, 89 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d3c00ee7/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index e8d23e5..3dd7289 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -35,7 +35,7 @@ (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric]) (:import [org.apache.storm Config Constants]) (:import [org.apache.storm.cluster ClusterStateContext DaemonType]) - (:import [org.apache.storm.metrics2 StormMetricRegistry]) + (:import [org.apache.storm.metrics2 StormMetricRegistry TaskMetrics]) (:import [com.codahale.metrics Meter Counter]) (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping]) (:import [java.util.concurrent ConcurrentLinkedQueue]) @@ -265,6 +265,7 @@ :stats (mk-executor-stats <> (sampling-rate storm-conf)) :interval->task->metric-registry (HashMap.) :task->component (:task->component worker) + :task-metrics (TaskMetrics/taskMetricsMap (first task-ids) (last task-ids) worker-context component-id) :stream->component->grouper (outbound-components worker-context component-id storm-conf) :report-error (throttled-report-error-fn <>) :report-error-and-die (fn [error] @@ -442,7 +443,7 @@ (.fail spout msg-id) (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-failed-tuple! (:stats executor-data) (StormMetricRegistry/counter "failed" (:worker-context executor-data) (:component-id executor-data) task-id (:stream tuple-info)) (:stream tuple-info) time-delta)))) + (stats/spout-failed-tuple! (:stats executor-data) (.getFailed ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) (:stream tuple-info)) (:stream tuple-info) time-delta)))) (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?] (let [^ISpout spout (:object task-data) @@ -451,7 +452,7 @@ (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-acked-tuple! (:stats executor-data) (StormMetricRegistry/counter "acked" (:worker-context executor-data) (:component-id executor-data) task-id (:stream tuple-info)) (:stream tuple-info) time-delta)))) + (stats/spout-acked-tuple! (:stats executor-data) (.getAcked ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) (:stream tuple-info)) (:stream tuple-info) time-delta)))) (defn mk-task-receiver [executor-data tuple-action-fn] (let [task-ids (:task-ids executor-data) @@ -820,7 +821,7 @@ (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta)) (when (<= 0 delta) (stats/bolt-acked-tuple! executor-stats - (StormMetricRegistry/counter "acked" worker-context (:component-id executor-data) task-id (.getSourceStreamId tuple)) + (.getAcked ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) (.getSourceStreamId tuple)) (.getSourceComponent tuple) (.getSourceStreamId tuple) delta)))) @@ -836,7 +837,7 @@ (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta)) (when (<= 0 delta) (stats/bolt-failed-tuple! executor-stats - (StormMetricRegistry/counter "failed" worker-context (:component-id executor-data) task-id (.getSourceStreamId tuple)) + (.getFailed ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) (.getSourceStreamId tuple)) (.getSourceComponent tuple) (.getSourceStreamId tuple) delta)))) http://git-wip-us.apache.org/repos/asf/storm/blob/d3c00ee7/storm-core/src/clj/org/apache/storm/daemon/task.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj index 9e18331..26ce76c 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/task.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj @@ -23,11 +23,11 @@ (:import [org.apache.storm.hooks.info SpoutAckInfo SpoutFailInfo EmitInfo BoltFailInfo BoltAckInfo]) (:import [org.apache.storm.task TopologyContext ShellBolt WorkerTopologyContext]) - (:import [org.apache.storm.metrics2 StormMetricRegistry]) + (:import [org.apache.storm.metrics2 StormMetricRegistry TaskMetrics]) (:import [org.apache.storm.utils Utils]) (:import [org.apache.storm.generated ShellComponent JavaObject]) (:import [org.apache.storm.spout ShellSpout]) - (:import [java.util Collection List ArrayList]) + (:import [java.util Collection List ArrayList Map]) (:import [com.codahale.metrics Meter Counter]) (:require [org.apache.storm [thrift :as thrift] @@ -143,9 +143,9 @@ (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping"))) (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id])) (when (emit-sampler) - (stats/emitted-tuple! executor-stats (StormMetricRegistry/counter "emitted" worker-context component-id task-id stream) stream) + (stats/emitted-tuple! executor-stats (.getEmitted ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) stream) stream) (if out-task-id - (stats/transferred-tuples! executor-stats (StormMetricRegistry/counter "transferred" worker-context component-id task-id stream) stream 1))) + (stats/transferred-tuples! executor-stats (.getTransferred ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) stream) stream 1))) (if out-task-id [out-task-id]) )) ([^String stream ^List values] @@ -163,8 +163,8 @@ ))) (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks)) (when (emit-sampler) - (stats/emitted-tuple! executor-stats (StormMetricRegistry/counter "emitted" worker-context component-id task-id stream) stream) - (stats/transferred-tuples! executor-stats (StormMetricRegistry/counter "transferred" worker-context component-id task-id stream) stream (count out-tasks))) + (stats/emitted-tuple! executor-stats (.getEmitted (.get ^Map (:task-metrics executor-data) task-id) stream) stream) + (stats/transferred-tuples! executor-stats (.getTransferred ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) stream) stream (count out-tasks))) out-tasks))) )) http://git-wip-us.apache.org/repos/asf/storm/blob/d3c00ee7/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java index e1305f9..e0023fd 100644 --- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java +++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java @@ -78,6 +78,11 @@ public class StormMetricRegistry { return REGISTRY.counter(metricName); } + public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){ + String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); + return REGISTRY.counter(metricName); + } + public static void start(Map<String, Object> stormConfig, DaemonType type){ try { hostName = dotToUnderScore(Utils.localHostname()); http://git-wip-us.apache.org/repos/asf/storm/blob/d3c00ee7/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java b/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java new file mode 100644 index 0000000..5bb01d2 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java @@ -0,0 +1,72 @@ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import org.apache.storm.task.WorkerTopologyContext; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class TaskMetrics { + ConcurrentMap<String, Counter> ackedByStream = new ConcurrentHashMap<>(); + ConcurrentMap<String, Counter> failedByStream = new ConcurrentHashMap<>(); + ConcurrentMap<String, Counter> emittedByStream = new ConcurrentHashMap<>(); + ConcurrentMap<String, Counter> transferredByStream = new ConcurrentHashMap<>(); + + private String topologyId; + private String componentId; + private Integer taskId; + private Integer workerPort; + + public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid){ + this.topologyId = context.getStormId(); + this.componentId = componentId; + this.taskId = taskid; + this.workerPort = context.getThisWorkerPort(); + } + + public Counter getAcked(String streamId) { + Counter c = this.ackedByStream.get(streamId); + if (c == null) { + c = StormMetricRegistry.counter("acked", this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); + this.ackedByStream.put(streamId, c); + } + return c; + } + + public Counter getFailed(String streamId) { + Counter c = this.ackedByStream.get(streamId); + if (c == null) { + c = StormMetricRegistry.counter("failed", this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); + this.failedByStream.put(streamId, c); + } + return c; + } + + public Counter getEmitted(String streamId) { + Counter c = this.emittedByStream.get(streamId); + if (c == null) { + c = StormMetricRegistry.counter("emitted", this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); + this.emittedByStream.put(streamId, c); + } + return c; + } + + public Counter getTransferred(String streamId) { + Counter c = this.transferredByStream.get(streamId); + if (c == null) { + c = StormMetricRegistry.counter("transferred", this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); + this.transferredByStream.put(streamId, c); + } + return c; + } + + public static Map<Integer, TaskMetrics> taskMetricsMap(Integer startTaskId, Integer endTaskId, WorkerTopologyContext context, String componentId){ + Map<Integer, TaskMetrics> retval = new HashMap<>(); + for (int i = startTaskId; i < endTaskId + 1; i++) { + retval.put(i, new TaskMetrics(context, componentId, i)); + } + return retval; + } +}
