upmerge from master

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/000fcb86
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/000fcb86
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/000fcb86

Branch: refs/heads/master
Commit: 000fcb86592772c6e51962c565c497c8b052aa47
Parents: 4c246d1 b477939
Author: 卫乐 <[email protected]>
Authored: Tue Mar 8 20:51:34 2016 +0800
Committer: 卫乐 <[email protected]>
Committed: Tue Mar 8 20:51:34 2016 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |   4 +
 .../travis/print-errors-from-test-reports.py    |   4 +
 .../src/clj/org/apache/storm/daemon/common.clj  |  13 +-
 .../src/clj/org/apache/storm/daemon/drpc.clj    |  30 +--
 .../clj/org/apache/storm/daemon/logviewer.clj   |  27 ++-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 189 +++++++++----------
 .../clj/org/apache/storm/daemon/supervisor.clj  |  12 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |  81 ++++----
 .../src/clj/org/apache/storm/ui/helpers.clj     |  10 +-
 .../storm/metric/StormMetricsRegistry.java      |  84 +++++++++
 .../auth/AbstractSaslClientCallbackHandler.java |  76 ++++++++
 .../auth/AbstractSaslServerCallbackHandler.java |  94 +++++++++
 .../auth/digest/ClientCallbackHandler.java      |  60 +-----
 .../auth/digest/ServerCallbackHandler.java      |  61 +-----
 .../auth/plain/PlainClientCallbackHandler.java  |  31 +++
 .../auth/plain/PlainSaslTransportPlugin.java    |  71 +++++++
 .../auth/plain/PlainServerCallbackHandler.java  |  55 ++++++
 .../security/auth/plain/SaslPlainServer.java    | 158 ++++++++++++++++
 18 files changed, 763 insertions(+), 297 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/000fcb86/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 997f92c,0af12a2..f2e60bf
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@@ -15,7 -15,7 +15,8 @@@
  ;; limitations under the License.
  (ns org.apache.storm.daemon.nimbus
    (:import [org.apache.thrift.server THsHaServer THsHaServer$Args]
-            [org.apache.storm.stats StatsUtil])
++           [org.apache.storm.stats StatsUtil]
+            [org.apache.storm.metric StormMetricsRegistry])
    (:import [org.apache.storm.generated KeyNotFoundException])
    (:import [org.apache.storm.blobstore LocalFsBlobStore])
    (:import [org.apache.thrift.protocol TBinaryProtocol 
TBinaryProtocol$Factory])
@@@ -559,7 -558,36 +558,6 @@@
                        executor->component
                        (:launch-time-secs storm-base))))
  
 -;; Does not assume that clocks are synchronized. Executor heartbeat is only 
used so that
 -;; nimbus knows when it's received a new heartbeat. All timing is done by 
nimbus and
 -;; tracked through heartbeat-cache
 -(defn- update-executor-cache [curr hb timeout]
 -  (let [reported-time (:time-secs hb)
 -        {last-nimbus-time :nimbus-time
 -         last-reported-time :executor-reported-time} curr
 -        reported-time (cond reported-time reported-time
 -                            last-reported-time last-reported-time
 -                            :else 0)
 -        nimbus-time (if (or (not last-nimbus-time)
 -                        (not= last-reported-time reported-time))
 -                      (Time/currentTimeSecs)
 -                      last-nimbus-time
 -                      )]
 -      {:is-timed-out (and
 -                       nimbus-time
 -                       (>= (Time/deltaSecs nimbus-time) timeout))
 -       :nimbus-time nimbus-time
 -       :executor-reported-time reported-time
 -       :heartbeat hb}))
 -
 -(defn update-heartbeat-cache [cache executor-beats all-executors timeout]
 -  (let [cache (select-keys cache all-executors)]
 -    (into {}
 -      (for [executor all-executors :let [curr (cache executor)]]
 -        [executor
 -         (update-executor-cache curr (get executor-beats executor) timeout)]
 -         ))))
--
  (defn update-heartbeats! [nimbus storm-id all-executors existing-assignment]
    (log-debug "Updating heartbeats for " storm-id " " (pr-str all-executors))
    (let [storm-cluster-state (:storm-cluster-state nimbus)
@@@ -1455,11 -1486,11 +1453,11 @@@
        (fn []
          (renew-credentials nimbus)))
  
-     (defgauge nimbus:num-supervisors
-       (fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil))))
- 
-     (start-metrics-reporters conf)
+     (def nimbus:num-supervisors (StormMetricsRegistry/registerGauge 
"nimbus:num-supervisors"
+       (fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil)))))
  
+     (StormMetricsRegistry/startMetricsReporters conf)
 -
++    
      (reify Nimbus$Iface
        (^void submitTopologyWithOpts
          [this ^String storm-name ^String uploadedJarLocation ^String 
serializedConf ^StormTopology topology
@@@ -1887,12 -1918,11 +1885,11 @@@
                            (map (fn [c] [c (errors-fn storm-cluster-state 
storm-id c)]))
                            (into {}))
                executor-summaries (dofor [[executor [node port]] 
(:executor->node+port assignment)]
-                                      (let [host (-> assignment :node->host 
(get node))
-                                               heartbeat (.get beats 
(StatsUtil/convertExecutor executor))
-                                               excutorstats (.get (.get 
heartbeat "heartbeat") "stats")
 -                                        (let [host (-> assignment :node->host 
(get node))
 -                                              heartbeat (get beats executor)
 -                                              excutorstats (:stats heartbeat)
--                                              excutorstats (if excutorstats
-                                                       
(StatsUtil/thriftifyExecutorStats excutorstats))]
-                                               
 -                                                      
(stats/thriftify-executor-stats excutorstats))]
++                                   (let [host (-> assignment :node->host (get 
node))
++                                            heartbeat (.get beats 
(StatsUtil/convertExecutor executor))
++                                            excutorstats (.get (.get 
heartbeat "heartbeat") "stats")
++                                            excutorstats (if excutorstats
++                                                    
(StatsUtil/thriftifyExecutorStats excutorstats))]
                                            (doto
                                                (ExecutorSummary. 
(thriftify-executor-id executor)
                                                                  (-> executor 
first task->component)
@@@ -2079,17 -2109,21 +2076,18 @@@
  
        (^TopologyPageInfo getTopologyPageInfo
          [this ^String topo-id ^String window ^boolean include-sys?]
-         (mark! nimbus:num-getTopologyPageInfo-calls)
+         (.mark nimbus:num-getTopologyPageInfo-calls)
          (let [info (get-common-topo-info topo-id "getTopologyPageInfo")
+ 
                exec->node+port (:executor->node+port (:assignment info))
 -              last-err-fn (partial get-last-error
 -                                   (:storm-cluster-state info)
 -                                   topo-id)
 -              topo-page-info (stats/agg-topo-execs-stats topo-id
 -                                                         exec->node+port
 -                                                         (:task->component 
info)
 -                                                         (:beats info)
 -                                                         (:topology info)
 -                                                         window
 -                                                         include-sys?
 -                                                         last-err-fn)]
 +              topo-page-info (StatsUtil/aggTopoExecsStats topo-id
-                                                          exec->node+port
-                                                          (:task->component 
info)
-                                                          (:beats info)
-                                                          (:topology info)
-                                                          window
-                                                          include-sys?
-                                                          
(:storm-cluster-state info))]
++                                                          exec->node+port
++                                                          (:task->component 
info)
++                                                          (:beats info)
++                                                          (:topology info)
++                                                          window
++                                                          include-sys?
++                                                          
(:storm-cluster-state info))]
            (when-let [owner (:owner (:base info))]
              (.set_owner topo-page-info owner))
            (when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)]

http://git-wip-us.apache.org/repos/asf/storm/blob/000fcb86/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/ui/core.clj
index 0730d96,e1ab71f..a538876
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@@ -21,15 -21,14 +21,15 @@@
          ring.middleware.multipart-params)
    (:use [ring.middleware.json :only [wrap-json-params]])
    (:use [hiccup core page-helpers])
 -  (:use [org.apache.storm config util log stats converter])
 +  (:use [org.apache.storm config util log converter])
    (:use [org.apache.storm.ui helpers])
    (:use [org.apache.storm.daemon [common :only [ACKER-COMPONENT-ID 
ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
-                                               ACKER-FAIL-STREAM-ID 
mk-authorization-handler
-                                               start-metrics-reporters]]])
 -                                              ACKER-FAIL-STREAM-ID 
mk-authorization-handler]]])
++                                                ACKER-FAIL-STREAM-ID 
mk-authorization-handler]]])
    (:import [org.apache.storm.utils Time]
             [org.apache.storm.generated NimbusSummary]
 +           [org.apache.storm.stats StatsUtil]
-            [org.apache.storm.ui UIHelpers IConfigurator FilterConfiguration])
+            [org.apache.storm.ui UIHelpers IConfigurator FilterConfiguration]
+            [org.apache.storm.metric StormMetricsRegistry])
    (:use [clojure.string :only [blank? lower-case trim split]])
    (:import [org.apache.storm.generated ExecutorSpecificStats
              ExecutorStats ExecutorSummary ExecutorInfo TopologyInfo 
SpoutStats BoltStats

Reply via email to