upmerge from master
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/000fcb86 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/000fcb86 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/000fcb86 Branch: refs/heads/master Commit: 000fcb86592772c6e51962c565c497c8b052aa47 Parents: 4c246d1 b477939 Author: å«ä¹ <[email protected]> Authored: Tue Mar 8 20:51:34 2016 +0800 Committer: å«ä¹ <[email protected]> Committed: Tue Mar 8 20:51:34 2016 +0800 ---------------------------------------------------------------------- CHANGELOG.md | 4 + .../travis/print-errors-from-test-reports.py | 4 + .../src/clj/org/apache/storm/daemon/common.clj | 13 +- .../src/clj/org/apache/storm/daemon/drpc.clj | 30 +-- .../clj/org/apache/storm/daemon/logviewer.clj | 27 ++- .../src/clj/org/apache/storm/daemon/nimbus.clj | 189 +++++++++---------- .../clj/org/apache/storm/daemon/supervisor.clj | 12 +- storm-core/src/clj/org/apache/storm/ui/core.clj | 81 ++++---- .../src/clj/org/apache/storm/ui/helpers.clj | 10 +- .../storm/metric/StormMetricsRegistry.java | 84 +++++++++ .../auth/AbstractSaslClientCallbackHandler.java | 76 ++++++++ .../auth/AbstractSaslServerCallbackHandler.java | 94 +++++++++ .../auth/digest/ClientCallbackHandler.java | 60 +----- .../auth/digest/ServerCallbackHandler.java | 61 +----- .../auth/plain/PlainClientCallbackHandler.java | 31 +++ .../auth/plain/PlainSaslTransportPlugin.java | 71 +++++++ .../auth/plain/PlainServerCallbackHandler.java | 55 ++++++ .../security/auth/plain/SaslPlainServer.java | 158 ++++++++++++++++ 18 files changed, 763 insertions(+), 297 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/000fcb86/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 997f92c,0af12a2..f2e60bf --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@@ -15,7 -15,7 +15,8 @@@ ;; limitations under the License. (ns org.apache.storm.daemon.nimbus (:import [org.apache.thrift.server THsHaServer THsHaServer$Args] - [org.apache.storm.stats StatsUtil]) ++ [org.apache.storm.stats StatsUtil] + [org.apache.storm.metric StormMetricsRegistry]) (:import [org.apache.storm.generated KeyNotFoundException]) (:import [org.apache.storm.blobstore LocalFsBlobStore]) (:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory]) @@@ -559,7 -558,36 +558,6 @@@ executor->component (:launch-time-secs storm-base)))) -;; Does not assume that clocks are synchronized. Executor heartbeat is only used so that -;; nimbus knows when it's received a new heartbeat. All timing is done by nimbus and -;; tracked through heartbeat-cache -(defn- update-executor-cache [curr hb timeout] - (let [reported-time (:time-secs hb) - {last-nimbus-time :nimbus-time - last-reported-time :executor-reported-time} curr - reported-time (cond reported-time reported-time - last-reported-time last-reported-time - :else 0) - nimbus-time (if (or (not last-nimbus-time) - (not= last-reported-time reported-time)) - (Time/currentTimeSecs) - last-nimbus-time - )] - {:is-timed-out (and - nimbus-time - (>= (Time/deltaSecs nimbus-time) timeout)) - :nimbus-time nimbus-time - :executor-reported-time reported-time - :heartbeat hb})) - -(defn update-heartbeat-cache [cache executor-beats all-executors timeout] - (let [cache (select-keys cache all-executors)] - (into {} - (for [executor all-executors :let [curr (cache executor)]] - [executor - (update-executor-cache curr (get executor-beats executor) timeout)] - )))) -- (defn update-heartbeats! [nimbus storm-id all-executors existing-assignment] (log-debug "Updating heartbeats for " storm-id " " (pr-str all-executors)) (let [storm-cluster-state (:storm-cluster-state nimbus) @@@ -1455,11 -1486,11 +1453,11 @@@ (fn [] (renew-credentials nimbus))) - (defgauge nimbus:num-supervisors - (fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil)))) - - (start-metrics-reporters conf) + (def nimbus:num-supervisors (StormMetricsRegistry/registerGauge "nimbus:num-supervisors" + (fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil))))) + (StormMetricsRegistry/startMetricsReporters conf) - ++ (reify Nimbus$Iface (^void submitTopologyWithOpts [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology @@@ -1887,12 -1918,11 +1885,11 @@@ (map (fn [c] [c (errors-fn storm-cluster-state storm-id c)])) (into {})) executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)] - (let [host (-> assignment :node->host (get node)) - heartbeat (.get beats (StatsUtil/convertExecutor executor)) - excutorstats (.get (.get heartbeat "heartbeat") "stats") - (let [host (-> assignment :node->host (get node)) - heartbeat (get beats executor) - excutorstats (:stats heartbeat) -- excutorstats (if excutorstats - (StatsUtil/thriftifyExecutorStats excutorstats))] - - (stats/thriftify-executor-stats excutorstats))] ++ (let [host (-> assignment :node->host (get node)) ++ heartbeat (.get beats (StatsUtil/convertExecutor executor)) ++ excutorstats (.get (.get heartbeat "heartbeat") "stats") ++ excutorstats (if excutorstats ++ (StatsUtil/thriftifyExecutorStats excutorstats))] (doto (ExecutorSummary. (thriftify-executor-id executor) (-> executor first task->component) @@@ -2079,17 -2109,21 +2076,18 @@@ (^TopologyPageInfo getTopologyPageInfo [this ^String topo-id ^String window ^boolean include-sys?] - (mark! nimbus:num-getTopologyPageInfo-calls) + (.mark nimbus:num-getTopologyPageInfo-calls) (let [info (get-common-topo-info topo-id "getTopologyPageInfo") + exec->node+port (:executor->node+port (:assignment info)) - last-err-fn (partial get-last-error - (:storm-cluster-state info) - topo-id) - topo-page-info (stats/agg-topo-execs-stats topo-id - exec->node+port - (:task->component info) - (:beats info) - (:topology info) - window - include-sys? - last-err-fn)] + topo-page-info (StatsUtil/aggTopoExecsStats topo-id - exec->node+port - (:task->component info) - (:beats info) - (:topology info) - window - include-sys? - (:storm-cluster-state info))] ++ exec->node+port ++ (:task->component info) ++ (:beats info) ++ (:topology info) ++ window ++ include-sys? ++ (:storm-cluster-state info))] (when-let [owner (:owner (:base info))] (.set_owner topo-page-info owner)) (when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)] http://git-wip-us.apache.org/repos/asf/storm/blob/000fcb86/storm-core/src/clj/org/apache/storm/ui/core.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/ui/core.clj index 0730d96,e1ab71f..a538876 --- a/storm-core/src/clj/org/apache/storm/ui/core.clj +++ b/storm-core/src/clj/org/apache/storm/ui/core.clj @@@ -21,15 -21,14 +21,15 @@@ ring.middleware.multipart-params) (:use [ring.middleware.json :only [wrap-json-params]]) (:use [hiccup core page-helpers]) - (:use [org.apache.storm config util log stats converter]) + (:use [org.apache.storm config util log converter]) (:use [org.apache.storm.ui helpers]) (:use [org.apache.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID - ACKER-FAIL-STREAM-ID mk-authorization-handler - start-metrics-reporters]]]) - ACKER-FAIL-STREAM-ID mk-authorization-handler]]]) ++ ACKER-FAIL-STREAM-ID mk-authorization-handler]]]) (:import [org.apache.storm.utils Time] [org.apache.storm.generated NimbusSummary] + [org.apache.storm.stats StatsUtil] - [org.apache.storm.ui UIHelpers IConfigurator FilterConfiguration]) + [org.apache.storm.ui UIHelpers IConfigurator FilterConfiguration] + [org.apache.storm.metric StormMetricsRegistry]) (:use [clojure.string :only [blank? lower-case trim split]]) (:import [org.apache.storm.generated ExecutorSpecificStats ExecutorStats ExecutorSummary ExecutorInfo TopologyInfo SpoutStats BoltStats
