changed according to comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/abe9b676 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/abe9b676 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/abe9b676 Branch: refs/heads/master Commit: abe9b676c0f15fa47809ae4a094001e345521de6 Parents: bfbd375 Author: å«ä¹ <[email protected]> Authored: Mon Feb 29 11:49:26 2016 +0800 Committer: å«ä¹ <[email protected]> Committed: Mon Feb 29 11:49:26 2016 +0800 ---------------------------------------------------------------------- .../clj/org/apache/storm/daemon/executor.clj | 8 +-- .../src/clj/org/apache/storm/daemon/nimbus.clj | 4 +- storm-core/src/clj/org/apache/storm/ui/core.clj | 4 +- .../apache/storm/stats/BoltExecutorStats.java | 57 ++++++++++++-------- .../jvm/org/apache/storm/stats/CommonStats.java | 31 ++++++----- .../apache/storm/stats/SpoutExecutorStats.java | 33 +++++++----- .../jvm/org/apache/storm/stats/StatsUtil.java | 7 ++- .../jvm/org/apache/storm/utils/ConfigUtils.java | 8 ++- .../test/clj/org/apache/storm/nimbus_test.clj | 2 +- .../clj/org/apache/storm/supervisor_test.clj | 11 ++-- 10 files changed, 99 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/abe9b676/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 3b4e330..4bbce10 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -257,8 +257,8 @@ :batch-transfer-queue batch-transfer->worker :transfer-fn (mk-executor-transfer-fn batch-transfer->worker storm-conf) :suicide-fn (:suicide-fn worker) - :storm-cluster-state (ClusterUtils/mkStormClusterState (:state-store worker) (Utils/getWorkerACL storm-conf) - (ClusterStateContext. DaemonType/WORKER)) + :storm-cluster-state (ClusterUtils/mkStormClusterState (:state-store worker) (Utils/getWorkerACL storm-conf) + (ClusterStateContext. DaemonType/WORKER)) :type executor-type ;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field) :stats (mk-executor-stats <> (ConfigUtils/samplingRate storm-conf)) @@ -861,7 +861,7 @@ ;; TODO: refactor this to be part of an executor-specific map (defmethod mk-executor-stats :spout [_ rate] - (SpoutExecutorStats/mkSpoutStats rate)) + (SpoutExecutorStats. rate)) (defmethod mk-executor-stats :bolt [_ rate] - (BoltExecutorStats/mkBoltStats rate)) + (BoltExecutorStats. rate)) http://git-wip-us.apache.org/repos/asf/storm/blob/abe9b676/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index f36cf7d..83f73d5 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -916,7 +916,7 @@ storm-cluster-state (:storm-cluster-state nimbus) ^INimbus inimbus (:inimbus nimbus) ;; read all the topologies - topology-ids (.activeStorms storm-cluster-state) + topology-ids (.activeStorms storm-cluster-state) topologies (into {} (for [tid topology-ids] {tid (read-topology-details nimbus tid)})) topologies (Topologies. topologies) @@ -1800,7 +1800,7 @@ storm-name (topology-conf TOPOLOGY-NAME) _ (check-authorization! nimbus storm-name topology-conf "getLogConfig") storm-cluster-state (:storm-cluster-state nimbus) - log-config (.topologyLogConfig storm-cluster-state id nil)] + log-config (.topologyLogConfig storm-cluster-state id nil)] (if log-config log-config (LogConfig.)))) (^String getTopologyConf [this ^String id] http://git-wip-us.apache.org/repos/asf/storm/blob/abe9b676/storm-core/src/clj/org/apache/storm/ui/core.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj index aad0e38..b9cf2d7 100644 --- a/storm-core/src/clj/org/apache/storm/ui/core.clj +++ b/storm-core/src/clj/org/apache/storm/ui/core.clj @@ -1222,7 +1222,7 @@ (json-response {"status" "ok" "id" host-port} (m "callback"))))) - + (GET "/api/v1/topology/:id/profiling/dumpheap/:host-port" [:as {:keys [servlet-request]} id host-port & m] (populate-context! servlet-request) @@ -1238,7 +1238,7 @@ (json-response {"status" "ok" "id" host-port} (m "callback"))))) - + (GET "/" [:as {cookies :cookies}] (mark! ui:num-main-page-http-requests) (resp/redirect "/index.html")) http://git-wip-us.apache.org/repos/asf/storm/blob/abe9b676/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java index d694bc3..f6dad09 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java +++ b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java @@ -17,9 +17,14 @@ */ package org.apache.storm.stats; -import clojure.lang.PersistentVector; +import com.google.common.collect.Lists; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.storm.generated.BoltStats; +import org.apache.storm.generated.ExecutorSpecificStats; +import org.apache.storm.generated.ExecutorStats; +import org.apache.storm.generated.SpoutStats; import org.apache.storm.metric.internal.MultiCountStatAndMetric; import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; @@ -34,14 +39,14 @@ public class BoltExecutorStats extends CommonStats { public static final String[] BOLT_FIELDS = {ACKED, FAILED, EXECUTED, PROCESS_LATENCIES, EXECUTE_LATENCIES}; - public BoltExecutorStats() { - super(); + public BoltExecutorStats(int rate) { + super(rate); - put(ACKED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); - put(FAILED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); - put(EXECUTED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); - put(PROCESS_LATENCIES, new MultiLatencyStatAndMetric(NUM_STAT_BUCKETS)); - put(EXECUTE_LATENCIES, new MultiLatencyStatAndMetric(NUM_STAT_BUCKETS)); + this.put(ACKED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); + this.put(FAILED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); + this.put(EXECUTED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); + this.put(PROCESS_LATENCIES, new MultiLatencyStatAndMetric(NUM_STAT_BUCKETS)); + this.put(EXECUTE_LATENCIES, new MultiLatencyStatAndMetric(NUM_STAT_BUCKETS)); } public MultiCountStatAndMetric getAcked() { @@ -65,19 +70,19 @@ public class BoltExecutorStats extends CommonStats { } public void boltExecuteTuple(String component, String stream, long latencyMs) { - Object key = PersistentVector.create(component, stream); + List key = Lists.newArrayList(component, stream); this.getExecuted().incBy(key, this.rate); this.getExecuteLatencies().record(key, latencyMs); } public void boltAckedTuple(String component, String stream, long latencyMs) { - Object key = PersistentVector.create(component, stream); + List key = Lists.newArrayList(component, stream); this.getAcked().incBy(key, this.rate); this.getProcessLatencies().record(key, latencyMs); } public void boltFailedTuple(String component, String stream, long latencyMs) { - Object key = PersistentVector.create(component, stream); + List key = Lists.newArrayList(component, stream); this.getFailed().incBy(key, this.rate); } @@ -92,16 +97,22 @@ public class BoltExecutorStats extends CommonStats { return ret; } - public void cleanupStats() { - super.cleanupStats(); - for (String field : BOLT_FIELDS) { - cleanupStat(this.get(field)); - } - } - - public static BoltExecutorStats mkBoltStats(int rate) { - BoltExecutorStats stats = new BoltExecutorStats(); - stats.setRate(rate); - return stats; - } +// public ExecutorStats renderStats() { +// cleanupStats(); +// +// ExecutorStats ret = new ExecutorStats(); +// ret.set_emitted(valueStat(EMITTED)); +// ret.set_transferred(valueStat(TRANSFERRED)); +// ret.set_rate(this.rate); +// +// BoltStats boltStats = new BoltStats( +// StatsUtil.windowSetConverter(valueStat(ACKED), StatsUtil.TO_GSID, StatsUtil.IDENTITY), +// StatsUtil.windowSetConverter(valueStat(FAILED), StatsUtil.TO_GSID, StatsUtil.IDENTITY), +// StatsUtil.windowSetConverter(valueStat(PROCESS_LATENCIES), StatsUtil.TO_GSID, StatsUtil.IDENTITY), +// StatsUtil.windowSetConverter(valueStat(EXECUTED), StatsUtil.TO_GSID, StatsUtil.IDENTITY), +// StatsUtil.windowSetConverter(valueStat(EXECUTE_LATENCIES), StatsUtil.TO_GSID, StatsUtil.IDENTITY)); +// ret.set_specific(ExecutorSpecificStats.bolt(boltStats)); +// +// return ret; +// } } http://git-wip-us.apache.org/repos/asf/storm/blob/abe9b676/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java b/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java index 93d42a4..e386413 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java +++ b/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java @@ -33,22 +33,19 @@ public class CommonStats { public static final String TRANSFERRED = "transferred"; public static final String[] COMMON_FIELDS = {EMITTED, TRANSFERRED}; - protected int rate; + protected final int rate; protected final Map metricMap = new HashMap(); - public CommonStats() { - put(EMITTED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); - put(TRANSFERRED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); + public CommonStats(int rate) { + this.rate = rate; + this.put(EMITTED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); + this.put(TRANSFERRED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); } public int getRate() { return this.rate; } - public void setRate(int rate) { - this.rate = rate; - } - public MultiCountStatAndMetric getEmitted() { return (MultiCountStatAndMetric) get(EMITTED); } @@ -73,13 +70,13 @@ public class CommonStats { this.getTransferred().incBy(stream, this.rate * amount); } - protected void cleanupStats() { - for (String field : COMMON_FIELDS) { - cleanupStat(this.get(field)); + public void cleanupStats() { + for (Object imetric : this.metricMap.values()) { + cleanupStat((IMetric) imetric); } } - protected void cleanupStat(IMetric metric) { + private void cleanupStat(IMetric metric) { if (metric instanceof MultiCountStatAndMetric) { ((MultiCountStatAndMetric) metric).close(); } else if (metric instanceof MultiLatencyStatAndMetric) { @@ -102,4 +99,14 @@ public class CommonStats { return ret; } + protected Map valueStat(String field) { + IMetric metric = this.get(field); + if (metric instanceof MultiCountStatAndMetric) { + return ((MultiCountStatAndMetric) metric).getTimeCounts(); + } else if (metric instanceof MultiLatencyStatAndMetric) { + return ((MultiLatencyStatAndMetric) metric).getTimeLatAvg(); + } + return null; + } + } http://git-wip-us.apache.org/repos/asf/storm/blob/abe9b676/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java index d6d9162..918ae06 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java +++ b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java @@ -19,6 +19,9 @@ package org.apache.storm.stats; import java.util.HashMap; import java.util.Map; +import org.apache.storm.generated.ExecutorSpecificStats; +import org.apache.storm.generated.ExecutorStats; +import org.apache.storm.generated.SpoutStats; import org.apache.storm.metric.internal.MultiCountStatAndMetric; import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; @@ -31,8 +34,8 @@ public class SpoutExecutorStats extends CommonStats { public static final String[] SPOUT_FIELDS = {ACKED, FAILED, COMPLETE_LATENCIES}; - public SpoutExecutorStats() { - super(); + public SpoutExecutorStats(int rate) { + super(rate); this.put(ACKED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); this.put(FAILED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); this.put(COMPLETE_LATENCIES, new MultiLatencyStatAndMetric(NUM_STAT_BUCKETS)); @@ -69,16 +72,18 @@ public class SpoutExecutorStats extends CommonStats { return ret; } - public void cleanupStats() { - super.cleanupStats(); - for (String field : SpoutExecutorStats.SPOUT_FIELDS) { - cleanupStat(this.get(field)); - } - } - - public static SpoutExecutorStats mkSpoutStats(int rate) { - SpoutExecutorStats stats = new SpoutExecutorStats(); - stats.setRate(rate); - return stats; - } +// public ExecutorStats renderStats() { +// cleanupStats(); +// +// ExecutorStats ret = new ExecutorStats(); +// ret.set_emitted(valueStat(EMITTED)); +// ret.set_transferred(valueStat(TRANSFERRED)); +// ret.set_rate(this.rate); +// +// SpoutStats spoutStats = new SpoutStats( +// valueStat(ACKED), valueStat(FAILED), valueStat(COMPLETE_LATENCIES)); +// ret.set_specific(ExecutorSpecificStats.spout(spoutStats)); +// +// return ret; +// } } http://git-wip-us.apache.org/repos/asf/storm/blob/abe9b676/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java index 75ec292..efdf8e0 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java +++ b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java @@ -113,10 +113,10 @@ public class StatsUtil { public static final int TEN_MIN_IN_SECONDS = 60 * 10; public static final String TEN_MIN_IN_SECONDS_STR = TEN_MIN_IN_SECONDS + ""; - private static final IdentityTransformer IDENTITY = new IdentityTransformer(); + public static final IdentityTransformer IDENTITY = new IdentityTransformer(); private static final ToStringTransformer TO_STRING = new ToStringTransformer(); private static final FromGlobalStreamIdTransformer FROM_GSID = new FromGlobalStreamIdTransformer(); - private static final ToGlobalStreamIdTransformer TO_GSID = new ToGlobalStreamIdTransformer(); + public static final ToGlobalStreamIdTransformer TO_GSID = new ToGlobalStreamIdTransformer(); // ===================================================================================== @@ -1659,6 +1659,9 @@ public class StatsUtil { Map executorStat = (Map) stat.get(1); ExecutorInfo executorInfo = new ExecutorInfo(start, end); ret.put(executorInfo, thriftifyExecutorStats(executorStat)); +// ExecutorStats executorStat = (ExecutorStats) stat.get(1); +// ExecutorInfo executorInfo = new ExecutorInfo(start, end); +// ret.put(executorInfo, executorStat); } return ret; } http://git-wip-us.apache.org/repos/asf/storm/blob/abe9b676/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java index 1ac0249..36d4352 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java @@ -44,7 +44,7 @@ public class ConfigUtils { // A singleton instance allows us to mock delegated static methods in our // tests by subclassing. - private static ConfigUtils _instance = new ConfigUtils();; + private static ConfigUtils _instance = new ConfigUtils(); /** * Provide an instance of this class for delegates to use. To mock out @@ -66,7 +66,11 @@ public class ConfigUtils { dir = System.getProperty("storm.log.dir"); } else if ((conf = readStormConfig()).get("storm.log.dir") != null) { dir = String.valueOf(conf.get("storm.log.dir")); - } else { + } else if (System.getProperty("storm.local.dir") != null) { + dir = System.getProperty("storm.local.dir") + FILE_SEPARATOR + "logs"; + } else if (conf.get("storm.local.dir") != null) { + dir = conf.get("storm.local.dir") + FILE_SEPARATOR + "logs"; + } else { dir = concatIfNotNull(System.getProperty("storm.home")) + FILE_SEPARATOR + "logs"; } try { http://git-wip-us.apache.org/repos/asf/storm/blob/abe9b676/storm-core/test/clj/org/apache/storm/nimbus_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj index 8c383e5..fe804d7 100644 --- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj +++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj @@ -145,7 +145,7 @@ stats (:executor-stats curr-beat)] (.workerHeartbeat state storm-id node port (thriftify-zk-worker-hb {:storm-id storm-id :time-secs (Time/currentTimeSecs) :uptime 10 - :executor-stats (merge stats {executor (clojurify-structure (.renderStats (BoltExecutorStats/mkBoltStats 20)))})}) + :executor-stats (merge stats {executor (clojurify-structure (.renderStats (BoltExecutorStats. 20)))})}) ))) (defn slot-assignments [cluster storm-id] http://git-wip-us.apache.org/repos/asf/storm/blob/abe9b676/storm-core/test/clj/org/apache/storm/supervisor_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj index cdd66e4..415a56d 100644 --- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj +++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj @@ -297,6 +297,7 @@ (let [mock-port "42" mock-storm-id "fake-storm-id" mock-worker-id "fake-worker-id" + storm-log-dir (ConfigUtils/getLogDir) mock-cp (str Utils/FILE_PATH_SEPARATOR "base" Utils/CLASS_PATH_SEPARATOR Utils/FILE_PATH_SEPARATOR "stormjar.jar") mock-sensitivity "S3" mock-cp "/base:/stormjar.jar" @@ -308,7 +309,7 @@ (str "-Dstorm.id=" mock-storm-id) (str "-Dworker.id=" mock-worker-id) (str "-Dworker.port=" mock-port) - "-Dstorm.log.dir=/logs" + (str "-Dstorm.log.dir=" storm-log-dir) "-Dlog4j.configurationFile=/log4j2/worker.xml" "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector" "org.apache.storm.LogWriter"] @@ -321,7 +322,7 @@ "-Dworkers.artifacts=/tmp/workers-artifacts" "-Dstorm.conf.file=" "-Dstorm.options=" - (str "-Dstorm.log.dir=" Utils/FILE_PATH_SEPARATOR "logs") + (str "-Dstorm.log.dir=" storm-log-dir) (str "-Dlogging.sensitivity=" mock-sensitivity) (str "-Dlog4j.configurationFile=" Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml") "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector" @@ -484,6 +485,7 @@ mock-cp "mock-classpath'quote-on-purpose" attrs (make-array FileAttribute 0) storm-local (.getCanonicalPath (.toFile (Files/createTempDirectory "storm-local" attrs))) + storm-log-dir (ConfigUtils/getLogDir) worker-script (str storm-local "/workers/" mock-worker-id "/storm-worker-script.sh") exp-launch ["/bin/worker-launcher" "me" @@ -499,7 +501,7 @@ " '-Dstorm.id=" mock-storm-id "'" " '-Dworker.id=" mock-worker-id "'" " '-Dworker.port=" mock-port "'" - " '-Dstorm.log.dir=/logs'" + " '-Dstorm.log.dir=" storm-log-dir "'" " '-Dlog4j.configurationFile=/log4j2/worker.xml'" " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'" " 'org.apache.storm.LogWriter'" @@ -512,7 +514,7 @@ " '-Dworkers.artifacts=" (str storm-local "/workers-artifacts'") " '-Dstorm.conf.file='" " '-Dstorm.options='" - " '-Dstorm.log.dir=/logs'" + " '-Dstorm.log.dir=" storm-log-dir "'" " '-Dlogging.sensitivity=" mock-sensitivity "'" " '-Dlog4j.configurationFile=/log4j2/worker.xml'" " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'" @@ -836,3 +838,4 @@ {"sup1" [3 4]} (get-storm-id (:storm-cluster-state cluster) "topology2")) ))) +
