STORM-2153 New Metrics Reporting API * address missing sampling rate * rename field names cause we use Counter instead of Meter
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/00a382b0 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/00a382b0 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/00a382b0 Branch: refs/heads/1.x-branch Commit: 00a382b017c1e29863ac4d9a4449086ef79384e4 Parents: 85dbacd Author: Jungtaek Lim <[email protected]> Authored: Thu Nov 30 10:38:27 2017 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Thu Nov 30 10:41:13 2017 +0900 ---------------------------------------------------------------------- .../clj/org/apache/storm/daemon/executor.clj | 22 ++++------ .../src/clj/org/apache/storm/daemon/task.clj | 8 ++-- storm-core/src/clj/org/apache/storm/stats.clj | 43 ++++++++++++-------- 3 files changed, 39 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/00a382b0/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 720bfa7..0aca4bd 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -257,7 +257,7 @@ :batch-transfer-queue batch-transfer->worker :transfer-fn (mk-executor-transfer-fn batch-transfer->worker storm-conf) :suicide-fn (:suicide-fn worker) - :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker) + :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker) :acls (Utils/getWorkerACL storm-conf) :context (ClusterStateContext. DaemonType/WORKER)) :type executor-type @@ -280,8 +280,8 @@ (log-message "Got interrupted excpetion shutting thread down...") ((:suicide-fn <>)))) :sampler (mk-stats-sampler storm-conf) - :failed-meter (StormMetricRegistry/counter "failed" worker-context component-id) - :acked-meter (StormMetricRegistry/counter "acked" worker-context component-id) + :failed-counter (StormMetricRegistry/counter "failed" worker-context component-id) + :acked-counter (StormMetricRegistry/counter "acked" worker-context component-id) :spout-throttling-metrics (if (= executor-type :spout) (builtin-metrics/make-spout-throttling-data) nil) @@ -437,27 +437,23 @@ (defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta reason id debug?] (let [^ISpout spout (:object task-data) storm-conf (:storm-conf executor-data) - task-id (:task-id task-data) - failed-meter (:failed-meter executor-data)] + task-id (:task-id task-data)] ;;TODO: need to throttle these when there's lots of failures (when debug? (log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id)) (.fail spout msg-id) (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta)) (when time-delta - (.inc ^Counter failed-meter) - (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta)))) + (stats/spout-failed-tuple! (:stats executor-data) (:failed-counter executor-data) (:stream tuple-info) time-delta)))) (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?] (let [^ISpout spout (:object task-data) - task-id (:task-id task-data) - acked-meter (:acked-meter executor-data)] + task-id (:task-id task-data)] (when debug? (log-message "SPOUT Acking message " id " " msg-id)) (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta - (.inc ^Counter acked-meter) - (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta)))) + (stats/spout-acked-tuple! (:stats executor-data) (:acked-counter executor-data) (:stream tuple-info) time-delta)))) (defn mk-task-receiver [executor-data tuple-action-fn] (let [task-ids (:task-ids executor-data) @@ -825,8 +821,8 @@ (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple)) (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta)) (when (<= 0 delta) - (.inc ^Counter (:acked-meter (:executor-data task-data))) (stats/bolt-acked-tuple! executor-stats + (:acked-counter (:executor-data task-data)) (.getSourceComponent tuple) (.getSourceStreamId tuple) delta)))) @@ -841,8 +837,8 @@ (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple)) (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta)) (when (<= 0 delta) - (.inc ^Counter (:failed-meter (:executor-data task-data))) (stats/bolt-failed-tuple! executor-stats + (:failed-counter (:executor-data task-data)) (.getSourceComponent tuple) (.getSourceStreamId tuple) delta)))) http://git-wip-us.apache.org/repos/asf/storm/blob/00a382b0/storm-core/src/clj/org/apache/storm/daemon/task.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj index c43d20d..a2f6c54 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/task.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj @@ -131,7 +131,7 @@ user-context (:user-context task-data) executor-stats (:stats executor-data) debug? (= true (storm-conf TOPOLOGY-DEBUG)) - ^Counter emitted-meter (StormMetricRegistry/counter "emitted" worker-context component-id)] + ^Counter emitted-counter (StormMetricRegistry/counter "emitted" worker-context component-id)] (fn ([^Integer out-task-id ^String stream ^List values] (when debug? @@ -144,8 +144,7 @@ (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping"))) (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id])) (when (emit-sampler) - (.inc ^Counter emitted-meter) - (stats/emitted-tuple! executor-stats stream) + (stats/emitted-tuple! executor-stats emitted-counter stream) (if out-task-id (stats/transferred-tuples! executor-stats stream 1))) (if out-task-id [out-task-id]) @@ -165,8 +164,7 @@ ))) (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks)) (when (emit-sampler) - (.inc ^Counter emitted-meter) - (stats/emitted-tuple! executor-stats stream) + (stats/emitted-tuple! executor-stats emitted-counter stream) (stats/transferred-tuples! executor-stats stream (count out-tasks))) out-tasks))) )) http://git-wip-us.apache.org/repos/asf/storm/blob/00a382b0/storm-core/src/clj/org/apache/storm/stats.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/stats.clj b/storm-core/src/clj/org/apache/storm/stats.clj index 17d0219..41aaf04 100644 --- a/storm-core/src/clj/org/apache/storm/stats.clj +++ b/storm-core/src/clj/org/apache/storm/stats.clj @@ -26,7 +26,8 @@ WorkerResources]) (:import [org.apache.storm.utils Utils]) (:import [org.apache.storm.scheduler WorkerSlot]) - (:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric]) + (:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric] + (com.codahale.metrics Counter)) (:use [org.apache.storm log util]) (:use [clojure.math.numeric-tower :only [ceil]])) @@ -117,9 +118,11 @@ `(:complete-latencies ~stats)) (defn emitted-tuple! - [stats stream] - (let [^MultiCountStatAndMetric emitted (stats-emitted stats)] - (.incBy emitted ^Object stream ^long (stats-rate stats)))) + [stats ^Counter emitted-counter stream] + (let [^MultiCountStatAndMetric emitted (stats-emitted stats) + ^long rate (stats-rate stats)] + (.incBy emitted ^Object stream rate) + (.inc emitted-counter rate))) (defn transferred-tuples! [stats stream amt] @@ -135,30 +138,38 @@ (.record exec-lat key latency-ms))) (defn bolt-acked-tuple! - [^BoltExecutorStats stats component stream latency-ms] + [^BoltExecutorStats stats ^Counter acked-counter component stream latency-ms] (let [key [component stream] ^MultiCountStatAndMetric acked (stats-acked stats) - ^MultiLatencyStatAndMetric process-lat (stats-process-latencies stats)] - (.incBy acked key (stats-rate stats)) + ^MultiLatencyStatAndMetric process-lat (stats-process-latencies stats) + ^long rate (stats-rate stats)] + (.incBy acked key rate) + (.inc acked-counter rate) (.record process-lat key latency-ms))) (defn bolt-failed-tuple! - [^BoltExecutorStats stats component stream latency-ms] + [^BoltExecutorStats stats ^Counter failed-counter component stream latency-ms] (let [key [component stream] - ^MultiCountStatAndMetric failed (stats-failed stats)] - (.incBy failed key (stats-rate stats)))) + ^MultiCountStatAndMetric failed (stats-failed stats) + ^long rate (stats-rate stats)] + (.incBy failed key rate) + (.inc failed-counter rate))) (defn spout-acked-tuple! - [^SpoutExecutorStats stats stream latency-ms] + [^SpoutExecutorStats stats ^Counter acked-counter stream latency-ms] (let [^MultiCountStatAndMetric acked (stats-acked stats) - ^MultiLatencyStatAndMetric complete-latencies (stats-complete-latencies stats)] - (.incBy acked stream (stats-rate stats)) + ^MultiLatencyStatAndMetric complete-latencies (stats-complete-latencies stats) + ^long rate (stats-rate stats)] + (.incBy acked stream rate) + (.inc acked-counter rate) (.record complete-latencies stream latency-ms))) (defn spout-failed-tuple! - [^SpoutExecutorStats stats stream latency-ms] - (let [^MultiCountStatAndMetric failed (stats-failed stats)] - (.incBy failed stream (stats-rate stats)))) + [^SpoutExecutorStats stats ^Counter failed-counter stream latency-ms] + (let [^MultiCountStatAndMetric failed (stats-failed stats) + ^long rate (stats-rate stats)] + (.incBy failed stream rate) + (.inc failed-counter rate))) (defn- close-stat! [stat] (.close stat))
