STORM-2153: move task-metrics from executor to task to avoid map lookup
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b257ba47 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b257ba47 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b257ba47 Branch: refs/heads/1.x-branch Commit: b257ba47aae42d5486901a1252cd9b5c0d9ad70e Parents: 8e2f7e7 Author: Jungtaek Lim <[email protected]> Authored: Fri Jan 12 06:52:22 2018 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Fri Jan 12 06:52:22 2018 +0900 ---------------------------------------------------------------------- storm-core/src/clj/org/apache/storm/daemon/executor.clj | 9 ++++----- storm-core/src/clj/org/apache/storm/daemon/task.clj | 9 +++++---- .../src/jvm/org/apache/storm/metrics2/TaskMetrics.java | 8 -------- 3 files changed, 9 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/b257ba47/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 3dd7289..3af9b2c 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -265,7 +265,6 @@ :stats (mk-executor-stats <> (sampling-rate storm-conf)) :interval->task->metric-registry (HashMap.) :task->component (:task->component worker) - :task-metrics (TaskMetrics/taskMetricsMap (first task-ids) (last task-ids) worker-context component-id) :stream->component->grouper (outbound-components worker-context component-id storm-conf) :report-error (throttled-report-error-fn <>) :report-error-and-die (fn [error] @@ -443,7 +442,7 @@ (.fail spout msg-id) (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-failed-tuple! (:stats executor-data) (.getFailed ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) (:stream tuple-info)) (:stream tuple-info) time-delta)))) + (stats/spout-failed-tuple! (:stats executor-data) (.getFailed ^TaskMetrics (:task-metrics task-data) (:stream tuple-info)) (:stream tuple-info) time-delta)))) (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?] (let [^ISpout spout (:object task-data) @@ -452,7 +451,7 @@ (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-acked-tuple! (:stats executor-data) (.getAcked ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) (:stream tuple-info)) (:stream tuple-info) time-delta)))) + (stats/spout-acked-tuple! (:stats executor-data) (.getAcked ^TaskMetrics (:task-metrics task-data) (:stream tuple-info)) (:stream tuple-info) time-delta)))) (defn mk-task-receiver [executor-data tuple-action-fn] (let [task-ids (:task-ids executor-data) @@ -821,7 +820,7 @@ (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta)) (when (<= 0 delta) (stats/bolt-acked-tuple! executor-stats - (.getAcked ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) (.getSourceStreamId tuple)) + (.getAcked ^TaskMetrics (:task-metrics task-data) (.getSourceStreamId tuple)) (.getSourceComponent tuple) (.getSourceStreamId tuple) delta)))) @@ -837,7 +836,7 @@ (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta)) (when (<= 0 delta) (stats/bolt-failed-tuple! executor-stats - (.getFailed ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) (.getSourceStreamId tuple)) + (.getFailed ^TaskMetrics (:task-metrics task-data) (.getSourceStreamId tuple)) (.getSourceComponent tuple) (.getSourceStreamId tuple) delta)))) http://git-wip-us.apache.org/repos/asf/storm/blob/b257ba47/storm-core/src/clj/org/apache/storm/daemon/task.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj index 68af75b..82bd2c5 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/task.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj @@ -143,9 +143,9 @@ (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping"))) (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id])) (when (emit-sampler) - (stats/emitted-tuple! executor-stats (.getEmitted ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) stream) stream) + (stats/emitted-tuple! executor-stats (.getEmitted ^TaskMetrics (:task-metrics task-data) stream) stream) (if out-task-id - (stats/transferred-tuples! executor-stats (.getTransferred ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) stream) stream 1))) + (stats/transferred-tuples! executor-stats (.getTransferred ^TaskMetrics (:task-metrics task-data) stream) stream 1))) (if out-task-id [out-task-id]) )) ([^String stream ^List values] @@ -163,8 +163,8 @@ ))) (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks)) (when (emit-sampler) - (stats/emitted-tuple! executor-stats (.getEmitted ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) stream) stream) - (stats/transferred-tuples! executor-stats (.getTransferred ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) stream) stream (count out-tasks))) + (stats/emitted-tuple! executor-stats (.getEmitted ^TaskMetrics (:task-metrics task-data) stream) stream) + (stats/transferred-tuples! executor-stats (.getTransferred ^TaskMetrics (:task-metrics task-data) stream) stream (count out-tasks))) out-tasks))) )) @@ -175,6 +175,7 @@ :system-context (system-topology-context (:worker executor-data) executor-data task-id) :user-context (user-topology-context (:worker executor-data) executor-data task-id) :builtin-metrics (builtin-metrics/make-data (:type executor-data) (:stats executor-data)) + :task-metrics (TaskMetrics. (:worker-context executor-data) (:component-id executor-data) task-id) :tasks-fn (mk-tasks-fn <>) :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data)))) http://git-wip-us.apache.org/repos/asf/storm/blob/b257ba47/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java b/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java index 550b176..05c62da 100644 --- a/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java +++ b/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java @@ -78,12 +78,4 @@ public class TaskMetrics { } return c; } - - public static Map<Integer, TaskMetrics> taskMetricsMap(Integer startTaskId, Integer endTaskId, WorkerTopologyContext context, String componentId){ - Map<Integer, TaskMetrics> retval = new HashMap<>(); - for (int i = startTaskId; i < endTaskId + 1; i++) { - retval.put(i, new TaskMetrics(context, componentId, i)); - } - return retval; - } }
