WIP apply sampling to new metrics
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/85dbacdd Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/85dbacdd Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/85dbacdd Branch: refs/heads/1.x-branch Commit: 85dbacdd058ee8b3246ff6982a4079713923b66e Parents: e9a9f50 Author: Jungtaek Lim <[email protected]> Authored: Tue Nov 28 11:51:45 2017 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Tue Nov 28 11:51:45 2017 +0900 ---------------------------------------------------------------------- storm-core/src/clj/org/apache/storm/daemon/executor.clj | 8 ++++---- storm-core/src/clj/org/apache/storm/daemon/task.clj | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/85dbacdd/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 94bd7af..720bfa7 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -442,10 +442,10 @@ ;;TODO: need to throttle these when there's lots of failures (when debug? (log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id)) - (.inc ^Counter failed-meter) (.fail spout msg-id) (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta)) (when time-delta + (.inc ^Counter failed-meter) (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta)))) (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?] @@ -453,10 +453,10 @@ task-id (:task-id task-data) acked-meter (:acked-meter executor-data)] (when debug? (log-message "SPOUT Acking message " id " " msg-id)) - (.inc ^Counter acked-meter) (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta + (.inc ^Counter acked-meter) (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta)))) (defn mk-task-receiver [executor-data tuple-action-fn] @@ -823,9 +823,9 @@ (let [delta (tuple-time-delta! tuple)] (when debug? (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple)) - (.inc ^Counter (:acked-meter (:executor-data task-data))) (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta)) (when (<= 0 delta) + (.inc ^Counter (:acked-meter (:executor-data task-data))) (stats/bolt-acked-tuple! executor-stats (.getSourceComponent tuple) (.getSourceStreamId tuple) @@ -839,9 +839,9 @@ debug? (= true (storm-conf TOPOLOGY-DEBUG))] (when debug? (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple)) - (.inc ^Counter (:failed-meter (:executor-data task-data))) (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta)) (when (<= 0 delta) + (.inc ^Counter (:failed-meter (:executor-data task-data))) (stats/bolt-failed-tuple! executor-stats (.getSourceComponent tuple) (.getSourceStreamId tuple) http://git-wip-us.apache.org/repos/asf/storm/blob/85dbacdd/storm-core/src/clj/org/apache/storm/daemon/task.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj index 7162f7f..c43d20d 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/task.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj @@ -134,7 +134,6 @@ ^Counter emitted-meter (StormMetricRegistry/counter "emitted" worker-context component-id)] (fn ([^Integer out-task-id ^String stream ^List values] - (.inc ^Counter emitted-meter) (when debug? (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values)) (let [target-component (.getComponentId worker-context out-task-id) @@ -145,13 +144,13 @@ (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping"))) (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id])) (when (emit-sampler) + (.inc ^Counter emitted-meter) (stats/emitted-tuple! executor-stats stream) (if out-task-id (stats/transferred-tuples! executor-stats stream 1))) (if out-task-id [out-task-id]) )) ([^String stream ^List values] - (.inc ^Counter emitted-meter) (when debug? (log-message "Emitting: " component-id " " stream " " values)) (let [out-tasks (ArrayList.)] @@ -166,6 +165,7 @@ ))) (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks)) (when (emit-sampler) + (.inc ^Counter emitted-meter) (stats/emitted-tuple! executor-stats stream) (stats/transferred-tuples! executor-stats stream (count out-tasks))) out-tasks)))
