Merge branch 'master' into supervisor
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/69c8b3c3 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/69c8b3c3 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/69c8b3c3 Branch: refs/heads/master Commit: 69c8b3c31d4ee528aea58f716b092c24ba6b0b1a Parents: f78c36d 6390d18 Author: xiaojian.fxj <[email protected]> Authored: Thu Mar 10 23:26:42 2016 +0800 Committer: xiaojian.fxj <[email protected]> Committed: Thu Mar 10 23:26:42 2016 +0800 ---------------------------------------------------------------------- CHANGELOG.md | 1 + .../src/clj/org/apache/storm/converter.clj | 15 + .../src/clj/org/apache/storm/daemon/common.clj | 350 +----------- .../clj/org/apache/storm/daemon/executor.clj | 24 +- .../clj/org/apache/storm/daemon/logviewer.clj | 2 +- .../src/clj/org/apache/storm/daemon/nimbus.clj | 63 +-- .../src/clj/org/apache/storm/daemon/task.clj | 5 +- .../src/clj/org/apache/storm/daemon/worker.clj | 24 +- storm-core/src/clj/org/apache/storm/testing.clj | 98 ++-- storm-core/src/clj/org/apache/storm/ui/core.clj | 15 +- .../org/apache/storm/daemon/DaemonCommon.java | 22 + .../org/apache/storm/daemon/StormCommon.java | 537 +++++++++++++++++++ .../storm/utils/StormCommonInstaller.java | 43 ++ .../src/jvm/org/apache/storm/utils/Utils.java | 16 + .../org/apache/storm/integration_test.clj | 6 +- .../test/clj/org/apache/storm/nimbus_test.clj | 121 +++-- .../apache/storm/security/auth/auth_test.clj | 3 +- .../clj/org/apache/storm/supervisor_test.clj | 11 +- 18 files changed, 833 insertions(+), 523 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/69c8b3c3/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/69c8b3c3/storm-core/src/clj/org/apache/storm/testing.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/testing.clj index dd1f2df,bda09ee..4cec39a --- a/storm-core/src/clj/org/apache/storm/testing.clj +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@@ -290,13 -286,13 +291,13 @@@ ([cluster-map timeout-ms] ;; wait until all workers, supervisors, and nimbus is waiting (let [supervisors @(:supervisors cluster-map) - workers (filter (partial satisfies? common/DaemonCommon) (clojurify-structure (ProcessSimulator/getAllProcessHandles))) + workers (filter (partial instance? DaemonCommon) (clojurify-structure (ProcessSimulator/getAllProcessHandles))) daemons (concat [(:nimbus cluster-map)] - supervisors ; because a worker may already be dead workers)] - (while-timeout timeout-ms (not (every? (memfn isWaiting) daemons)) + (while-timeout timeout-ms (or (not (every? (memfn waiting?) daemons)) + (not (every? is-supervisor-waiting supervisors))) (Thread/sleep (rand-int 20)) ;; (doseq [d daemons] ;; (if-not ((memfn waiting?) d) @@@ -377,26 -373,27 +378,28 @@@ (defn submit-mocked-assignment [nimbus storm-cluster-state storm-name conf topology task->component executor->node+port worker->resources] - (with-var-roots [common/storm-task-info (fn [& ignored] task->component) - nimbus/compute-new-scheduler-assignments (mocked-compute-new-scheduler-assignments) - nimbus/convert-assignments-to-worker->resources (mocked-convert-assignments-to-worker->resources - storm-cluster-state - storm-name - worker->resources) - nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port - storm-cluster-state - storm-name - executor->node+port)] - (submit-local-topology nimbus storm-name conf topology))) + (let [fake-common (proxy [StormCommon] [] + (stormTaskInfoImpl [_] task->component))] + (with-open [- (StormCommonInstaller. fake-common)] + (with-var-roots [nimbus/compute-new-scheduler-assignments (mocked-compute-new-scheduler-assignments) + nimbus/convert-assignments-to-worker->resources (mocked-convert-assignments-to-worker->resources + storm-cluster-state + storm-name + worker->resources) + nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port + storm-cluster-state + storm-name + executor->node+port)] + (submit-local-topology nimbus storm-name conf topology))))) (defn mk-capture-launch-fn [capture-atom] - (fn [supervisor storm-id port worker-id mem-onheap] - (let [supervisor-id (:supervisor-id supervisor) - conf (:conf supervisor) - existing (get @capture-atom [supervisor-id port] [])] - (ConfigUtils/setWorkerUserWSE conf worker-id "") - (swap! capture-atom assoc [supervisor-id port] (conj existing storm-id))))) + (fn [supervisorData stormId port workerId resources] + (let [conf (.getConf supervisorData) + supervisorId (.getSupervisorId supervisorData) + existing (get @capture-atom [supervisorId port] [])] + (log-message "mk-capture-launch-fn") + (ConfigUtils/setWorkerUserWSE conf workerId "") + (swap! capture-atom assoc [supervisorId port] (conj existing stormId))))) (defn find-worker-id [supervisor-conf port] http://git-wip-us.apache.org/repos/asf/storm/blob/69c8b3c3/storm-core/src/jvm/org/apache/storm/utils/Utils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/69c8b3c3/storm-core/test/clj/org/apache/storm/supervisor_test.clj ---------------------------------------------------------------------- diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj index 2ff21ac,ade1c2f..d3d7344 --- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj +++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj @@@ -37,9 -34,10 +37,10 @@@ (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils] [org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller]) (:import [java.nio.file.attribute FileAttribute]) + (:import [org.apache.storm.daemon StormCommon]) (:use [org.apache.storm config testing util log converter]) (:use [org.apache.storm.daemon common]) - (:require [org.apache.storm.daemon [worker :as worker] [supervisor :as supervisor]]) + (:require [org.apache.storm.daemon [worker :as worker] [local-supervisor :as local-supervisor]]) (:use [conjure core]) (:require [clojure.java.io :as io])) @@@ -871,8 -840,9 +872,8 @@@ )) (validate-launched-once (:launched changed) {"sup1" [1 2]} - (get-storm-id (:storm-cluster-state cluster) "topology1")) + (StormCommon/getStormId (:storm-cluster-state cluster) "topology1")) (validate-launched-once (:launched changed) {"sup1" [3 4]} - (get-storm-id (:storm-cluster-state cluster) "topology2")) + (StormCommon/getStormId (:storm-cluster-state cluster) "topology2")) ))) -
