move update tuple stat/renderStats methods to corresponding ExecutorStat classes
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f61ea0c0 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f61ea0c0 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f61ea0c0 Branch: refs/heads/master Commit: f61ea0c0196da4f31126f3f96ffb2bf5551a01d2 Parents: 52d3b58 Author: å«ä¹ <[email protected]> Authored: Thu Feb 25 10:59:42 2016 +0800 Committer: å«ä¹ <[email protected]> Committed: Thu Feb 25 10:59:42 2016 +0800 ---------------------------------------------------------------------- .../clj/org/apache/storm/daemon/executor.clj | 18 +-- .../src/clj/org/apache/storm/daemon/task.clj | 9 +- .../apache/storm/stats/BoltExecutorStats.java | 45 ++++++ .../jvm/org/apache/storm/stats/CommonStats.java | 40 +++++ .../apache/storm/stats/SpoutExecutorStats.java | 35 +++++ .../jvm/org/apache/storm/stats/StatsUtil.java | 147 +------------------ .../test/clj/org/apache/storm/nimbus_test.clj | 4 +- 7 files changed, 139 insertions(+), 159 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/f61ea0c0/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index bca03df..8009f6c 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -17,7 +17,7 @@ (:use [org.apache.storm.daemon common]) (:import [org.apache.storm.generated Grouping Grouping$_Fields] [java.io Serializable] - [org.apache.storm.stats StatsUtil]) + [org.apache.storm.stats BoltExecutorStats SpoutExecutorStats]) (:use [org.apache.storm util config log]) (:import [java.util List Random HashMap ArrayList LinkedList Map]) (:import [org.apache.storm ICredentialsListener Thrift]) @@ -408,7 +408,7 @@ (reify RunningExecutor (render-stats [this] - (clojurify-structure (StatsUtil/renderStats (:stats executor-data)))) + (clojurify-structure (.renderStats (:stats executor-data)))) (get-executor-id [this] executor-id) (credentials-changed [this creds] @@ -448,7 +448,7 @@ (.fail spout msg-id) (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta)) (when time-delta - (StatsUtil/spoutFailedTuple (:stats executor-data) (:stream tuple-info) time-delta)))) + (.spoutFailedTuple (:stats executor-data) (:stream tuple-info) time-delta)))) (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id] (let [storm-conf (:storm-conf executor-data) @@ -459,7 +459,7 @@ (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta - (StatsUtil/spoutAckedTuple (:stats executor-data) (:stream tuple-info) time-delta)))) + (.spoutAckedTuple (:stats executor-data) (:stream tuple-info) time-delta)))) (defn mk-task-receiver [executor-data tuple-action-fn] (let [task-ids (:task-ids executor-data) @@ -740,7 +740,7 @@ (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta)) (when delta - (StatsUtil/boltExecuteTuple executor-stats + (.boltExecuteTuple executor-stats (.getSourceComponent tuple) (.getSourceStreamId tuple) delta))))))) @@ -813,7 +813,7 @@ (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple)) (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta)) (when delta - (StatsUtil/boltAckedTuple executor-stats + (.boltAckedTuple executor-stats (.getSourceComponent tuple) (.getSourceStreamId tuple) delta)))) @@ -828,7 +828,7 @@ (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple)) (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta)) (when delta - (StatsUtil/boltFailedTuple executor-stats + (.boltFailedTuple executor-stats (.getSourceComponent tuple) (.getSourceStreamId tuple) delta)))) @@ -863,7 +863,7 @@ ;; TODO: refactor this to be part of an executor-specific map (defmethod mk-executor-stats :spout [_ rate] - (StatsUtil/mkSpoutStats rate)) + (SpoutExecutorStats/mkSpoutStats rate)) (defmethod mk-executor-stats :bolt [_ rate] - (StatsUtil/mkBoltStats rate)) + (BoltExecutorStats/mkBoltStats rate)) http://git-wip-us.apache.org/repos/asf/storm/blob/f61ea0c0/storm-core/src/clj/org/apache/storm/daemon/task.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj index c9f6828..707cdda 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/task.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj @@ -26,7 +26,6 @@ (:import [org.apache.storm.utils Utils ConfigUtils]) (:import [org.apache.storm.generated ShellComponent JavaObject]) (:import [org.apache.storm.spout ShellSpout]) - (:import [org.apache.storm.stats StatsUtil]) (:import [java.util Collection List ArrayList]) (:import [org.apache.storm Thrift]) (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])) @@ -140,9 +139,9 @@ (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping"))) (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id])) (when (emit-sampler) - (StatsUtil/emittedTuple executor-stats stream) + (.emittedTuple executor-stats stream) (if out-task-id - (StatsUtil/transferredTuples executor-stats stream, 1))) + (.transferredTuples executor-stats stream, 1))) (if out-task-id [out-task-id]) )) ([^String stream ^List values] @@ -162,8 +161,8 @@ ))) (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks)) (when (emit-sampler) - (StatsUtil/emittedTuple executor-stats stream) - (StatsUtil/transferredTuples executor-stats stream (count out-tasks))) + (.emittedTuple executor-stats stream) + (.transferredTuples executor-stats stream (count out-tasks))) out-tasks))) )) http://git-wip-us.apache.org/repos/asf/storm/blob/f61ea0c0/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java index 7909a08..d694bc3 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java +++ b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java @@ -17,9 +17,13 @@ */ package org.apache.storm.stats; +import clojure.lang.PersistentVector; +import java.util.HashMap; +import java.util.Map; import org.apache.storm.metric.internal.MultiCountStatAndMetric; import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; +@SuppressWarnings("unchecked") public class BoltExecutorStats extends CommonStats { public static final String ACKED = "acked"; @@ -59,4 +63,45 @@ public class BoltExecutorStats extends CommonStats { public MultiLatencyStatAndMetric getExecuteLatencies() { return (MultiLatencyStatAndMetric) this.get(EXECUTE_LATENCIES); } + + public void boltExecuteTuple(String component, String stream, long latencyMs) { + Object key = PersistentVector.create(component, stream); + this.getExecuted().incBy(key, this.rate); + this.getExecuteLatencies().record(key, latencyMs); + } + + public void boltAckedTuple(String component, String stream, long latencyMs) { + Object key = PersistentVector.create(component, stream); + this.getAcked().incBy(key, this.rate); + this.getProcessLatencies().record(key, latencyMs); + } + + public void boltFailedTuple(String component, String stream, long latencyMs) { + Object key = PersistentVector.create(component, stream); + this.getFailed().incBy(key, this.rate); + + } + + public Map renderStats() { + cleanupStats(); + Map ret = new HashMap(); + ret.putAll(valueStats(CommonStats.COMMON_FIELDS)); + ret.putAll(valueStats(BoltExecutorStats.BOLT_FIELDS)); + StatsUtil.putRawKV(ret, StatsUtil.TYPE, StatsUtil.KW_BOLT); + + return ret; + } + + public void cleanupStats() { + super.cleanupStats(); + for (String field : BOLT_FIELDS) { + cleanupStat(this.get(field)); + } + } + + public static BoltExecutorStats mkBoltStats(int rate) { + BoltExecutorStats stats = new BoltExecutorStats(); + stats.setRate(rate); + return stats; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f61ea0c0/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java b/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java index a8bf706..93d42a4 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java +++ b/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java @@ -21,7 +21,9 @@ import java.util.HashMap; import java.util.Map; import org.apache.storm.metric.api.IMetric; import org.apache.storm.metric.internal.MultiCountStatAndMetric; +import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; +@SuppressWarnings("unchecked") public class CommonStats { public static final int NUM_STAT_BUCKETS = 20; @@ -62,4 +64,42 @@ public class CommonStats { protected void put(String field, Object value) { StatsUtil.putRawKV(metricMap, field, value); } + + public void emittedTuple(String stream) { + this.getEmitted().incBy(stream, this.rate); + } + + public void transferredTuples(String stream, int amount) { + this.getTransferred().incBy(stream, this.rate * amount); + } + + protected void cleanupStats() { + for (String field : COMMON_FIELDS) { + cleanupStat(this.get(field)); + } + } + + protected void cleanupStat(IMetric metric) { + if (metric instanceof MultiCountStatAndMetric) { + ((MultiCountStatAndMetric) metric).close(); + } else if (metric instanceof MultiLatencyStatAndMetric) { + ((MultiLatencyStatAndMetric) metric).close(); + } + } + + protected Map valueStats(String[] fields) { + Map ret = new HashMap(); + for (String field : fields) { + IMetric metric = this.get(field); + if (metric instanceof MultiCountStatAndMetric) { + StatsUtil.putRawKV(ret, field, ((MultiCountStatAndMetric) metric).getTimeCounts()); + } else if (metric instanceof MultiLatencyStatAndMetric) { + StatsUtil.putRawKV(ret, field, ((MultiLatencyStatAndMetric) metric).getTimeLatAvg()); + } + } + StatsUtil.putRawKV(ret, CommonStats.RATE, this.getRate()); + + return ret; + } + } http://git-wip-us.apache.org/repos/asf/storm/blob/f61ea0c0/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java index 621ac24..d6d9162 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java +++ b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java @@ -17,9 +17,12 @@ */ package org.apache.storm.stats; +import java.util.HashMap; +import java.util.Map; import org.apache.storm.metric.internal.MultiCountStatAndMetric; import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; +@SuppressWarnings("unchecked") public class SpoutExecutorStats extends CommonStats { public static final String ACKED = "acked"; @@ -46,4 +49,36 @@ public class SpoutExecutorStats extends CommonStats { public MultiLatencyStatAndMetric getCompleteLatencies() { return (MultiLatencyStatAndMetric) this.get(COMPLETE_LATENCIES); } + + public void spoutAckedTuple(String stream, long latencyMs) { + this.getAcked().incBy(stream, this.rate); + this.getCompleteLatencies().record(stream, latencyMs); + } + + public void spoutFailedTuple(String stream, long latencyMs) { + this.getFailed().incBy(stream, this.rate); + } + + public Map renderStats() { + cleanupStats(); + Map ret = new HashMap(); + ret.putAll(valueStats(CommonStats.COMMON_FIELDS)); + ret.putAll(valueStats(SpoutExecutorStats.SPOUT_FIELDS)); + StatsUtil.putRawKV(ret, StatsUtil.TYPE, StatsUtil.KW_SPOUT); + + return ret; + } + + public void cleanupStats() { + super.cleanupStats(); + for (String field : SpoutExecutorStats.SPOUT_FIELDS) { + cleanupStat(this.get(field)); + } + } + + public static SpoutExecutorStats mkSpoutStats(int rate) { + SpoutExecutorStats stats = new SpoutExecutorStats(); + stats.setRate(rate); + return stats; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f61ea0c0/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java index 144872f..22ececf 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java +++ b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java @@ -48,9 +48,6 @@ import org.apache.storm.generated.SpoutStats; import org.apache.storm.generated.StormTopology; import org.apache.storm.generated.TopologyPageInfo; import org.apache.storm.generated.TopologyStats; -import org.apache.storm.metric.api.IMetric; -import org.apache.storm.metric.internal.MultiCountStatAndMetric; -import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,9 +56,11 @@ import org.slf4j.LoggerFactory; public class StatsUtil { private static final Logger logger = LoggerFactory.getLogger(StatsUtil.class); - private static final String TYPE = "type"; + public static final String TYPE = "type"; private static final String SPOUT = "spout"; private static final String BOLT = "bolt"; + public static final Keyword KW_SPOUT = keyword(SPOUT); + public static final Keyword KW_BOLT = keyword(BOLT); private static final String UPTIME = "uptime"; private static final String HOST = "host"; @@ -111,9 +110,6 @@ public class StatsUtil { private static final String CID_SID_TO_IN_STATS = "cid+sid->input-stats"; private static final String WORKERS_SET = "workers-set"; - private static final Keyword KW_SPOUT = keyword(SPOUT); - private static final Keyword KW_BOLT = keyword(BOLT); - public static final int TEN_MIN_IN_SECONDS = 60 * 10; public static final String TEN_MIN_IN_SECONDS_STR = TEN_MIN_IN_SECONDS + ""; @@ -124,120 +120,6 @@ public class StatsUtil { // ===================================================================================== - // update stats methods - // ===================================================================================== - - public static BoltExecutorStats mkBoltStats(int rate) { - BoltExecutorStats stats = new BoltExecutorStats(); - stats.setRate(rate); - return stats; - } - - public static SpoutExecutorStats mkSpoutStats(int rate) { - SpoutExecutorStats stats = new SpoutExecutorStats(); - stats.setRate(rate); - return stats; - } - - public static void emittedTuple(CommonStats stats, String stream) { - stats.getEmitted().incBy(stream, stats.rate); - } - - public static void transferredTuples(CommonStats stats, String stream, int amount) { - stats.getTransferred().incBy(stream, stats.rate * amount); - } - - public static void boltExecuteTuple(BoltExecutorStats stats, String component, String stream, long latencyMs) { - Object key = PersistentVector.create(component, stream); - stats.getExecuted().incBy(key, stats.rate); - stats.getExecuteLatencies().record(key, latencyMs); - } - - public static void boltAckedTuple(BoltExecutorStats stats, String component, String stream, long latencyMs) { - Object key = PersistentVector.create(component, stream); - stats.getAcked().incBy(key, stats.rate); - stats.getProcessLatencies().record(key, latencyMs); - } - - public static void boltFailedTuple(BoltExecutorStats stats, String component, String stream, long latencyMs) { - Object key = PersistentVector.create(component, stream); - stats.getFailed().incBy(key, stats.rate); - - } - - public static void spoutAckedTuple(SpoutExecutorStats stats, String stream, long latencyMs) { - stats.getAcked().incBy(stream, stats.rate); - stats.getCompleteLatencies().record(stream, latencyMs); - } - - public static void spoutFailedTuple(SpoutExecutorStats stats, String stream, long latencyMs) { - stats.getFailed().incBy(stream, stats.rate); - } - - private static void cleanupStat(IMetric metric) { - if (metric instanceof MultiCountStatAndMetric) { - ((MultiCountStatAndMetric) metric).close(); - } else if (metric instanceof MultiLatencyStatAndMetric) { - ((MultiLatencyStatAndMetric) metric).close(); - } - } - - public static Map renderStats(SpoutExecutorStats stats) { - cleanupSpoutStats(stats); - Map ret = new HashMap(); - ret.putAll(valueStats(stats, CommonStats.COMMON_FIELDS)); - ret.putAll(valueStats(stats, SpoutExecutorStats.SPOUT_FIELDS)); - putRawKV(ret, TYPE, KW_SPOUT); - - return ret; - } - - public static Map renderStats(BoltExecutorStats stats) { - cleanupBoltStats(stats); - Map ret = new HashMap(); - ret.putAll(valueStats(stats, CommonStats.COMMON_FIELDS)); - ret.putAll(valueStats(stats, BoltExecutorStats.BOLT_FIELDS)); - putRawKV(ret, TYPE, KW_BOLT); - - return ret; - } - - public static void cleanupSpoutStats(SpoutExecutorStats stats) { - cleanupCommonStats(stats); - for (String field : SpoutExecutorStats.SPOUT_FIELDS) { - cleanupStat(stats.get(field)); - } - } - - public static void cleanupBoltStats(BoltExecutorStats stats) { - cleanupCommonStats(stats); - for (String field : BoltExecutorStats.BOLT_FIELDS) { - cleanupStat(stats.get(field)); - } - } - - public static void cleanupCommonStats(CommonStats stats) { - for (String field : CommonStats.COMMON_FIELDS) { - cleanupStat(stats.get(field)); - } - } - - private static Map valueStats(CommonStats stats, String[] fields) { - Map ret = new HashMap(); - for (String field : fields) { - IMetric metric = stats.get(field); - if (metric instanceof MultiCountStatAndMetric) { - putRawKV(ret, field, ((MultiCountStatAndMetric) metric).getTimeCounts()); - } else if (metric instanceof MultiLatencyStatAndMetric) { - putRawKV(ret, field, ((MultiLatencyStatAndMetric) metric).getTimeLatAvg()); - } - } - putRawKV(ret, CommonStats.RATE, stats.getRate()); - - return ret; - } - - // ===================================================================================== // aggregation stats methods // ===================================================================================== @@ -1166,9 +1048,6 @@ public class StatsUtil { return ret; } - /** - * called in nimbus.clj - */ public static ComponentPageInfo aggCompExecsStats( Map exec2hostPort, Map task2component, Map beats, String window, boolean includeSys, String topologyId, StormTopology topology, String componentId) { @@ -1184,9 +1063,6 @@ public class StatsUtil { // clojurify stats methods // ===================================================================================== - /** - * called in converter.clj - */ public static Map clojurifyStats(Map stats) { Map ret = new HashMap(); for (Object o : stats.entrySet()) { @@ -1245,9 +1121,6 @@ public class StatsUtil { return ret; } - /** - * caller: nimbus.clj - */ public static List extractNodeInfosFromHbForComp( Map exec2hostPort, Map task2component, boolean includeSys, String compId) { List ret = new ArrayList(); @@ -1340,7 +1213,7 @@ public class StatsUtil { /** - * caller: core.clj + * computes max bolt capacity * * @param executorSumms a list of ExecutorSummary * @return max bolt capacity @@ -1774,9 +1647,6 @@ public class StatsUtil { return ret; } - /** - * called in converter.clj - */ public static Map thriftifyStats(List stats) { Map ret = new HashMap(); for (Object o : stats) { @@ -1791,9 +1661,6 @@ public class StatsUtil { return ret; } - /** - * called in nimbus.clj - */ public static ExecutorStats thriftifyExecutorStats(Map stats) { ExecutorStats ret = new ExecutorStats(); ExecutorSpecificStats specificStats = thriftifySpecificStats(stats); @@ -2091,16 +1958,10 @@ public class StatsUtil { return t / c; } - /** - * caller: core.clj - */ public static String floatStr(double n) { return String.format("%.3f", n); } - /** - * caller: core.clj - */ public static String errorSubset(String errorStr) { return errorStr.substring(0, 200); } http://git-wip-us.apache.org/repos/asf/storm/blob/f61ea0c0/storm-core/test/clj/org/apache/storm/nimbus_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj index a76db54..5964e6f 100644 --- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj +++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj @@ -23,7 +23,7 @@ [org.apache.storm.nimbus InMemoryTopologyActionNotifier] [org.apache.storm.generated GlobalStreamId] [org.apache.storm Thrift] - [org.apache.storm.stats StatsUtil]) + [org.apache.storm.stats BoltExecutorStats]) (:import [org.apache.storm.testing.staticmocking MockedZookeeper]) (:import [org.apache.storm.scheduler INimbus]) (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo]) @@ -141,7 +141,7 @@ stats (:executor-stats curr-beat)] (.worker-heartbeat! state storm-id node port {:storm-id storm-id :time-secs (Time/currentTimeSecs) :uptime 10 - :executor-stats (merge stats {executor (clojurify-structure (StatsUtil/renderStats (StatsUtil/mkBoltStats 20)))})} + :executor-stats (merge stats {executor (clojurify-structure (.renderStats (BoltExecutorStats/mkBoltStats 20)))})} ))) (defn slot-assignments [cluster storm-id]
