STORM-2153: use taskId in metrics names instead of executorId
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8d53800f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8d53800f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8d53800f Branch: refs/heads/1.x-branch Commit: 8d53800f14ced3fd630a02dfd9537d5900979562 Parents: 8bf7252 Author: P. Taylor Goetz <[email protected]> Authored: Wed Jan 10 14:25:50 2018 -0500 Committer: P. Taylor Goetz <[email protected]> Committed: Wed Jan 10 14:25:50 2018 -0500 ---------------------------------------------------------------------- storm-core/src/clj/org/apache/storm/daemon/executor.clj | 8 ++++---- storm-core/src/clj/org/apache/storm/daemon/task.clj | 8 ++++---- .../org/apache/storm/metrics2/StormMetricRegistry.java | 12 ++++++------ 3 files changed, 14 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/8d53800f/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 993add6..e8d23e5 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -442,7 +442,7 @@ (.fail spout msg-id) (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-failed-tuple! (:stats executor-data) (StormMetricRegistry/counter "failed" (:worker-context executor-data) (:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream tuple-info)) (:stream tuple-info) time-delta)))) + (stats/spout-failed-tuple! (:stats executor-data) (StormMetricRegistry/counter "failed" (:worker-context executor-data) (:component-id executor-data) task-id (:stream tuple-info)) (:stream tuple-info) time-delta)))) (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?] (let [^ISpout spout (:object task-data) @@ -451,7 +451,7 @@ (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-acked-tuple! (:stats executor-data) (StormMetricRegistry/counter "acked" (:worker-context executor-data) (:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream tuple-info)) (:stream tuple-info) time-delta)))) + (stats/spout-acked-tuple! (:stats executor-data) (StormMetricRegistry/counter "acked" (:worker-context executor-data) (:component-id executor-data) task-id (:stream tuple-info)) (:stream tuple-info) time-delta)))) (defn mk-task-receiver [executor-data tuple-action-fn] (let [task-ids (:task-ids executor-data) @@ -820,7 +820,7 @@ (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta)) (when (<= 0 delta) (stats/bolt-acked-tuple! executor-stats - (StormMetricRegistry/counter "acked" worker-context (:component-id executor-data) (pr-str (:executor-id executor-data)) (.getSourceStreamId tuple)) + (StormMetricRegistry/counter "acked" worker-context (:component-id executor-data) task-id (.getSourceStreamId tuple)) (.getSourceComponent tuple) (.getSourceStreamId tuple) delta)))) @@ -836,7 +836,7 @@ (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta)) (when (<= 0 delta) (stats/bolt-failed-tuple! executor-stats - (StormMetricRegistry/counter "failed" worker-context (:component-id executor-data) (pr-str (:executor-id executor-data)) (.getSourceStreamId tuple)) + (StormMetricRegistry/counter "failed" worker-context (:component-id executor-data) task-id (.getSourceStreamId tuple)) (.getSourceComponent tuple) (.getSourceStreamId tuple) delta)))) http://git-wip-us.apache.org/repos/asf/storm/blob/8d53800f/storm-core/src/clj/org/apache/storm/daemon/task.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj index 7132fc1..9e18331 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/task.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj @@ -143,9 +143,9 @@ (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping"))) (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id])) (when (emit-sampler) - (stats/emitted-tuple! executor-stats (StormMetricRegistry/counter "emitted" worker-context component-id (pr-str (:executor-id executor-data)) stream) stream) + (stats/emitted-tuple! executor-stats (StormMetricRegistry/counter "emitted" worker-context component-id task-id stream) stream) (if out-task-id - (stats/transferred-tuples! executor-stats (StormMetricRegistry/counter "transferred" worker-context component-id (pr-str (:executor-id executor-data)) stream) stream 1))) + (stats/transferred-tuples! executor-stats (StormMetricRegistry/counter "transferred" worker-context component-id task-id stream) stream 1))) (if out-task-id [out-task-id]) )) ([^String stream ^List values] @@ -163,8 +163,8 @@ ))) (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks)) (when (emit-sampler) - (stats/emitted-tuple! executor-stats (StormMetricRegistry/counter "emitted" worker-context component-id (pr-str (:executor-id executor-data)) stream) stream) - (stats/transferred-tuples! executor-stats (StormMetricRegistry/counter "transferred" worker-context component-id (pr-str (:executor-id executor-data)) stream) stream (count out-tasks))) + (stats/emitted-tuple! executor-stats (StormMetricRegistry/counter "emitted" worker-context component-id task-id stream) stream) + (stats/transferred-tuples! executor-stats (StormMetricRegistry/counter "transferred" worker-context component-id task-id stream) stream (count out-tasks))) out-tasks))) )) http://git-wip-us.apache.org/repos/asf/storm/blob/8d53800f/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java index aea4539..e1305f9 100644 --- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java +++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java @@ -68,13 +68,13 @@ public class StormMetricRegistry { ); } - public static Meter meter(String name, WorkerTopologyContext context, String componentId, String executorId, String streamId){ - String metricName = metricName(name, context.getStormId(), componentId, streamId,executorId, context.getThisWorkerPort()); + public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ + String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); return REGISTRY.meter(metricName); } - public static Counter counter(String name, WorkerTopologyContext context, String componentId, String executorId, String streamId){ - String metricName = metricName(name, context.getStormId(), componentId, streamId,executorId, context.getThisWorkerPort()); + public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){ + String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); return REGISTRY.counter(metricName); } @@ -124,7 +124,7 @@ public class StormMetricRegistry { } } - public static String metricName(String name, String stormId, String componentId, String streamId, String executorId, Integer workerPort){ + public static String metricName(String name, String stormId, String componentId, String streamId, Integer taskId, Integer workerPort){ StringBuilder sb = new StringBuilder("storm.worker."); sb.append(stormId); sb.append("."); @@ -134,7 +134,7 @@ public class StormMetricRegistry { sb.append("."); sb.append(dotToUnderScore(streamId)); sb.append("."); - sb.append(dotToUnderScore(executorId)); + sb.append(taskId); sb.append("."); sb.append(workerPort); sb.append("-");
