update test codes about supervisor
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/19fcafbd Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/19fcafbd Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/19fcafbd Branch: refs/heads/master Commit: 19fcafbd0fe1cbee49e797824c47ba1f6b727270 Parents: b281c73 Author: xiaojian.fxj <[email protected]> Authored: Wed Mar 2 09:00:37 2016 +0800 Committer: xiaojian.fxj <[email protected]> Committed: Fri Mar 4 09:21:09 2016 +0800 ---------------------------------------------------------------------- bin/storm.cmd | 2 +- bin/storm.py | 2 +- .../org/apache/storm/command/kill_workers.clj | 14 +- .../apache/storm/daemon/local_supervisor.clj | 61 + .../clj/org/apache/storm/daemon/logviewer.clj | 8 +- .../clj/org/apache/storm/daemon/supervisor.clj | 1356 ------------------ storm-core/src/clj/org/apache/storm/testing.clj | 57 +- .../storm/daemon/supervisor/ShutdownWork.java | 11 +- .../daemon/supervisor/StandaloneSupervisor.java | 7 +- .../storm/daemon/supervisor/Supervisor.java | 196 +++ .../storm/daemon/supervisor/SupervisorData.java | 5 +- .../daemon/supervisor/SupervisorServer.java | 201 --- .../daemon/supervisor/SupervisorUtils.java | 108 +- .../daemon/supervisor/SyncProcessEvent.java | 246 ++-- .../daemon/supervisor/SyncSupervisorEvent.java | 11 +- .../supervisor/timer/RunProfilerActions.java | 2 - .../supervisor/timer/SupervisorHeartbeat.java | 12 +- .../staticmocking/MockedSupervisorUtils.java | 31 + .../src/jvm/org/apache/storm/utils/Utils.java | 4 +- .../clj/org/apache/storm/logviewer_test.clj | 36 +- .../clj/org/apache/storm/supervisor_test.clj | 300 ++-- 21 files changed, 775 insertions(+), 1895 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/bin/storm.cmd ---------------------------------------------------------------------- diff --git a/bin/storm.cmd b/bin/storm.cmd index 1ef1e42..e84bfb3 100644 --- a/bin/storm.cmd +++ b/bin/storm.cmd @@ -214,7 +214,7 @@ goto :eof :supervisor - set CLASS=org.apache.storm.daemon.supervisor + set CLASS=org.apache.storm.daemon.supervisor.Supervisor "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.ConfigValue supervisor.childopts > %CMD_TEMP_FILE% FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do ( FOR /F "tokens=1,* delims= " %%a in ("%%i") do ( http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/bin/storm.py ---------------------------------------------------------------------- diff --git a/bin/storm.py b/bin/storm.py index 94d6143..a669783 100755 --- a/bin/storm.py +++ b/bin/storm.py @@ -552,7 +552,7 @@ def pacemaker(klass="org.apache.storm.pacemaker.pacemaker"): extrajars=cppaths, jvmopts=jvmopts) -def supervisor(klass="org.apache.storm.daemon.supervisor"): +def supervisor(klass="org.apache.storm.daemon.supervisor.Supervisor"): """Syntax: [storm supervisor] Launches the supervisor daemon. This command should be run http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/clj/org/apache/storm/command/kill_workers.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/kill_workers.clj b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj index 4e713f9..a7de176 100644 --- a/storm-core/src/clj/org/apache/storm/command/kill_workers.clj +++ b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj @@ -14,11 +14,10 @@ ;; See the License for the specific language governing permissions and ;; limitations under the License. (ns org.apache.storm.command.kill-workers - (:import [java.io File]) + (:import [java.io File] + [org.apache.storm.daemon.supervisor SupervisorUtils StandaloneSupervisor SupervisorData ShutdownWork]) (:use [org.apache.storm.daemon common]) (:use [org.apache.storm util config]) - (:require [org.apache.storm.daemon - [supervisor :as supervisor]]) (:import [org.apache.storm.utils ConfigUtils]) (:gen-class)) @@ -27,8 +26,9 @@ [& args] (let [conf (clojurify-structure (ConfigUtils/readStormConfig)) conf (assoc conf STORM-LOCAL-DIR (. (File. (conf STORM-LOCAL-DIR)) getCanonicalPath)) - isupervisor (supervisor/standalone-supervisor) - supervisor-data (supervisor/supervisor-data conf nil isupervisor) - ids (supervisor/my-worker-ids conf)] + isupervisor (StandaloneSupervisor.) + supervisor-data (SupervisorData. conf nil isupervisor) + ids (SupervisorUtils/myWorkerIds conf) + shut-workers (ShutdownWork.)] (doseq [id ids] - (supervisor/shutdown-worker supervisor-data id)))) + (.shutWorker shut-workers supervisor-data id)))) http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj new file mode 100644 index 0000000..65cf907 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj @@ -0,0 +1,61 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.daemon.local-supervisor + (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData ShutdownWork Supervisor] + [org.apache.storm.utils Utils ConfigUtils] + [org.apache.storm ProcessSimulator]) + (:use [org.apache.storm.daemon common] + [org.apache.storm log]) + (:require [org.apache.storm.daemon [worker :as worker] ]) + (:require [clojure.string :as str]) + (:gen-class)) + +(defn launch-local-worker [supervisorData stormId port workerId resources] + (let [conf (.getConf supervisorData) + pid (Utils/uuid) + worker (worker/mk-worker conf + (.getSharedContext supervisorData) + stormId + (.getAssignmentId supervisorData) + (int port) + workerId)] + (ConfigUtils/setWorkerUserWSE conf workerId "") + (ProcessSimulator/registerProcess pid worker) + (.put (.getWorkerThreadPidsAtom supervisorData) workerId pid) + )) + +(defn shutdown-local-worker [supervisorData workerId] + (let [shut-workers (ShutdownWork.)] + (log-message "shutdown-local-worker") + (.shutWorker shut-workers supervisorData workerId))) + +(defn local-process [] + "Create a local process event" + (proxy [SyncProcessEvent] [] + (launchLocalWorker [supervisorData stormId port workerId resources] + (launch-local-worker supervisorData stormId port workerId resources)) + (shutWorker [supervisorData workerId] (shutdown-local-worker supervisorData workerId)))) + + +(defserverfn mk-local-supervisor [conf shared-context isupervisor] + (log-message "Starting local Supervisor with conf " conf) + (if (not (ConfigUtils/isLocalMode conf)) + (throw + (IllegalArgumentException. "Cannot start server in distrubuted mode!"))) + (let [local-process (local-process) + supervisor-server (Supervisor.)] + (.setLocalSyncProcess supervisor-server local-process) + (.mkSupervisor supervisor-server conf shared-context isupervisor))) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj index 221dad7..38ac3ee 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj @@ -20,7 +20,8 @@ (:use [hiccup core page-helpers form-helpers]) (:use [org.apache.storm config util log]) (:use [org.apache.storm.ui helpers]) - (:import [org.apache.storm StormTimer]) + (:import [org.apache.storm StormTimer] + [org.apache.storm.daemon.supervisor SupervisorUtils]) (:import [org.apache.storm.utils Utils Time VersionInfo ConfigUtils]) (:import [org.slf4j LoggerFactory]) (:import [java.util Arrays ArrayList HashSet]) @@ -38,7 +39,6 @@ [org.yaml.snakeyaml.constructor SafeConstructor]) (:import [org.apache.storm.ui InvalidRequestException UIHelpers IConfigurator FilterConfiguration] [org.apache.storm.security.auth AuthUtils]) - (:require [org.apache.storm.daemon common [supervisor :as supervisor]]) (:require [compojure.route :as route] [compojure.handler :as handler] [ring.middleware.keyword-params] @@ -159,10 +159,10 @@ (defn get-alive-ids [conf now-secs] (->> - (supervisor/read-worker-heartbeats conf) + (clojurify-structure (SupervisorUtils/readWorkerHeartbeats conf)) (remove #(or (not (val %)) - (supervisor/is-worker-hb-timed-out? now-secs + (SupervisorUtils/isWorkerHbTimedOut now-secs (val %) conf))) keys http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj deleted file mode 100644 index 7295679..0000000 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ /dev/null @@ -1,1356 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.daemon.supervisor - (:import [java.io File IOException FileOutputStream]) - (:import [org.apache.storm.scheduler ISupervisor] - [org.apache.storm.utils LocalState Time Utils Utils$ExitCodeCallable - ConfigUtils] - [org.apache.storm.daemon Shutdownable] - [org.apache.storm Constants] - [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils IStateStorage] - [java.net JarURLConnection] - [java.net URI URLDecoder] - [org.apache.commons.io FileUtils]) - (:use [org.apache.storm config util log converter local-state-converter]) - (:import [org.apache.storm.generated AuthorizationException KeyNotFoundException WorkerResources]) - (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo]) - (:import [java.nio.file Files StandardCopyOption]) - (:import [org.apache.storm.generated WorkerResources ProfileAction LocalAssignment]) - (:import [org.apache.storm Config ProcessSimulator]) - (:import [org.apache.storm.localizer LocalResource]) - (:import [org.apache.storm.event EventManagerImp]) - (:use [org.apache.storm.daemon common]) - (:import [org.apache.storm.command HealthCheck]) - (:require [org.apache.storm.daemon [worker :as worker]] - - [clojure.set :as set]) - (:import [org.apache.thrift.transport TTransportException]) - (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms]) - (:import [org.yaml.snakeyaml Yaml] - [org.yaml.snakeyaml.constructor SafeConstructor]) - (:require [metrics.gauges :refer [defgauge]]) - (:require [metrics.meters :refer [defmeter mark!]]) - (:import [org.apache.storm StormTimer]) - (:gen-class - :methods [^{:static true} [launch [org.apache.storm.scheduler.ISupervisor] void]]) - (:require [clojure.string :as str])) - -(defmeter supervisor:num-workers-launched) - -(defmulti download-storm-code cluster-mode) -(defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor)))) - -(def STORM-VERSION (VersionInfo/getVersion)) - -(defprotocol SupervisorDaemon - (get-id [this]) - (get-conf [this]) - (shutdown-all-workers [this]) - ) - -;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE -(defn- assignments-snapshot [storm-cluster-state callback assignment-versions] - (let [storm-ids (.assignments storm-cluster-state callback)] - (let [new-assignments - (->> - (dofor [sid storm-ids] - (let [recorded-version (:version (get assignment-versions sid))] - (if-let [assignment-version (.assignmentVersion storm-cluster-state sid callback)] - (if (= assignment-version recorded-version) - {sid (get assignment-versions sid)} - (let [thriftify-assignment-version (.assignmentInfoWithVersion storm-cluster-state sid callback) - assignment (clojurify-assignment (.get thriftify-assignment-version (IStateStorage/DATA)))] - {sid {:data assignment :version (.get thriftify-assignment-version (IStateStorage/VERSION))}})) - {sid nil}))) - (apply merge) - (filter-val not-nil?)) - new-profiler-actions - (->> - (dofor [sid (distinct storm-ids)] - - (if-let [topo-profile-actions (into [] (for [request (.getTopologyProfileRequests storm-cluster-state sid)] (clojurify-profile-request request)))] - {sid topo-profile-actions})) - (apply merge))] - {:assignments (into {} (for [[k v] new-assignments] [k (:data v)])) - :profiler-actions new-profiler-actions - :versions new-assignments}))) - -(defn mk-local-assignment - [storm-id executors resources] - {:storm-id storm-id :executors executors :resources resources}) - -(defn- read-my-executors [assignments-snapshot storm-id assignment-id] - (let [assignment (get assignments-snapshot storm-id) - my-slots-resources (into {} - (filter (fn [[[node _] _]] (= node assignment-id)) - (:worker->resources assignment))) - my-executors (filter (fn [[_ [node _]]] (= node assignment-id)) - (:executor->node+port assignment)) - port-executors (apply merge-with - concat - (for [[executor [_ port]] my-executors] - {port [executor]} - ))] - (into {} (for [[port executors] port-executors] - ;; need to cast to int b/c it might be a long (due to how yaml parses things) - ;; doall is to avoid serialization/deserialization problems with lazy seqs - [(Integer. port) (mk-local-assignment storm-id (doall executors) (get my-slots-resources [assignment-id port]))] - )))) - -(defn- read-assignments - "Returns map from port to struct containing :storm-id, :executors and :resources" - ([assignments-snapshot assignment-id] - (->> (dofor [sid (keys assignments-snapshot)] (read-my-executors assignments-snapshot sid assignment-id)) - (apply merge-with (fn [& ignored] (throw (RuntimeException. (str "Should not have multiple topologies assigned to one port"))))))) - ([assignments-snapshot assignment-id existing-assignment retries] - (try (let [assignments (read-assignments assignments-snapshot assignment-id)] - (reset! retries 0) - assignments) - (catch RuntimeException e - (if (> @retries 2) (throw e) (swap! retries inc)) - (log-warn (.getMessage e) ": retrying " @retries " of 3") - existing-assignment)))) - -;TODO: when translating this function, you should replace the map-val with a proper for loop HERE -(defn- read-storm-code-locations - [assignments-snapshot] - (map-val :master-code-dir assignments-snapshot)) - -(defn- read-downloaded-storm-ids [conf] - (map #(URLDecoder/decode %) (Utils/readDirContents (ConfigUtils/supervisorStormDistRoot conf)))) - -(defn ->executor-list - [executors] - (into [] - (for [exec-info executors] - [(.get_task_start exec-info) (.get_task_end exec-info)]))) - -(defn ls-worker-heartbeat - [^LocalState local-state] - (if-let [worker-hb (.getWorkerHeartBeat ^LocalState local-state)] - {:time-secs (.get_time_secs worker-hb) - :storm-id (.get_topology_id worker-hb) - :executors (->executor-list (.get_executors worker-hb)) - :port (.get_port worker-hb)})) - -(defn read-worker-heartbeat [conf id] - (let [local-state (ConfigUtils/workerState conf id)] - (try - (ls-worker-heartbeat local-state) - (catch Exception e - (log-warn e "Failed to read local heartbeat for workerId : " id ",Ignoring exception.") - nil)))) - - -(defn my-worker-ids [conf] - (Utils/readDirContents (ConfigUtils/workerRoot conf))) - -(defn read-worker-heartbeats - "Returns map from worker id to heartbeat" - [conf] - (let [ids (my-worker-ids conf)] - (into {} - (dofor [id ids] - [id (read-worker-heartbeat conf id)])) - )) - - -(defn matches-an-assignment? [worker-heartbeat assigned-executors] - (let [local-assignment (assigned-executors (:port worker-heartbeat))] - (and local-assignment - (= (:storm-id worker-heartbeat) (:storm-id local-assignment)) - (= (disj (set (:executors worker-heartbeat)) Constants/SYSTEM_EXECUTOR_ID) - (set (:executors local-assignment)))))) - -(let [dead-workers (atom #{})] - (defn get-dead-workers [] - @dead-workers) - (defn add-dead-worker [worker] - (swap! dead-workers conj worker)) - (defn remove-dead-worker [worker] - (swap! dead-workers disj worker))) - -(defn is-worker-hb-timed-out? [now hb conf] - (> (- now (:time-secs hb)) - (conf SUPERVISOR-WORKER-TIMEOUT-SECS))) - -(defn read-allocated-workers - "Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)" - [supervisor assigned-executors now] - (let [conf (:conf supervisor) - ^LocalState local-state (:local-state supervisor) - id->heartbeat (read-worker-heartbeats conf) - approved-ids (set (keys (clojurify-structure (.getApprovedWorkers ^LocalState local-state))))] - (into - {} - (dofor [[id hb] id->heartbeat] - (let [state (cond - (not hb) - :not-started - (or (not (contains? approved-ids id)) - (not (matches-an-assignment? hb assigned-executors))) - :disallowed - (or - (when (get (get-dead-workers) id) - (log-message "Worker Process " id " has died!") - true) - (is-worker-hb-timed-out? now hb conf)) - :timed-out - true - :valid)] - (log-debug "Worker " id " is " state ": " (pr-str hb) " at supervisor time-secs " now) - [id [state hb]] - )) - ))) - -(defn- wait-for-worker-launch [conf id start-time] - (let [state (ConfigUtils/workerState conf id)] - (loop [] - (let [hb (.getWorkerHeartBeat state)] - (when (and - (not hb) - (< - (- (Time/currentTimeSecs) start-time) - (conf SUPERVISOR-WORKER-START-TIMEOUT-SECS) - )) - (log-message id " still hasn't started") - (Time/sleep 500) - (recur) - ))) - (when-not (.getWorkerHeartBeat state) - (log-message "Worker " id " failed to start") - ))) - -(defn- wait-for-workers-launch [conf ids] - (let [start-time (Time/currentTimeSecs)] - (doseq [id ids] - (wait-for-worker-launch conf id start-time)) - )) - -(defn generate-supervisor-id [] - (Utils/uuid)) - -(defnk worker-launcher [conf user args :environment {} :log-prefix nil :exit-code-callback nil :directory nil] - (let [_ (when (clojure.string/blank? user) - (throw (java.lang.IllegalArgumentException. - "User cannot be blank when calling worker-launcher."))) - wl-initial (conf SUPERVISOR-WORKER-LAUNCHER) - storm-home (System/getProperty "storm.home") - wl (if wl-initial wl-initial (str storm-home "/bin/worker-launcher")) - command (concat [wl user] args)] - (log-message "Running as user:" user " command:" (pr-str command)) - (Utils/launchProcess command - environment - log-prefix - exit-code-callback - directory))) - -(defnk worker-launcher-and-wait [conf user args :environment {} :log-prefix nil] - (let [process (worker-launcher conf user args :environment environment)] - (if log-prefix - (Utils/readAndLogStream log-prefix (.getInputStream process))) - (try - (.waitFor process) - (catch InterruptedException e - (log-message log-prefix " interrupted."))) - (.exitValue process))) - -(defn- rmr-as-user - "Launches a process owned by the given user that deletes the given path - recursively. Throws RuntimeException if the directory is not removed." - [conf id path] - (let [user (Utils/getFileOwner path)] - (worker-launcher-and-wait conf - user - ["rmr" path] - :log-prefix (str "rmr " id)) - (if (Utils/checkFileExists path) - (throw (RuntimeException. (str path " was not deleted")))))) - -(defn try-cleanup-worker [conf supervisor id] - (try - (if (.exists (File. (ConfigUtils/workerRoot conf id))) - (do - (if (conf SUPERVISOR-RUN-WORKER-AS-USER) - (rmr-as-user conf id (ConfigUtils/workerRoot conf id)) - (do - (Utils/forceDelete (ConfigUtils/workerHeartbeatsRoot conf id)) - ;; this avoids a race condition with worker or subprocess writing pid around same time - (Utils/forceDelete (ConfigUtils/workerPidsRoot conf id)) - (Utils/forceDelete (ConfigUtils/workerRoot conf id)))) - (ConfigUtils/removeWorkerUserWSE conf id) - (remove-dead-worker id) - )) - (if (conf STORM-RESOURCE-ISOLATION-PLUGIN-ENABLE) - (.releaseResourcesForWorker (:resource-isolation-manager supervisor) id)) - (catch IOException e - (log-warn-error e "Failed to cleanup worker " id ". Will retry later")) - (catch RuntimeException e - (log-warn-error e "Failed to cleanup worker " id ". Will retry later") - ) - (catch java.io.FileNotFoundException e (log-message (.getMessage e))) - )) - -(defn shutdown-worker [supervisor id] - (log-message "Shutting down " (:supervisor-id supervisor) ":" id) - (let [conf (:conf supervisor) - pids (Utils/readDirContents (ConfigUtils/workerPidsRoot conf id)) - thread-pid (@(:worker-thread-pids-atom supervisor) id) - shutdown-sleep-secs (conf SUPERVISOR-WORKER-SHUTDOWN-SLEEP-SECS) - as-user (conf SUPERVISOR-RUN-WORKER-AS-USER) - user (ConfigUtils/getWorkerUser conf id)] - (when thread-pid - (ProcessSimulator/killProcess thread-pid)) - (doseq [pid pids] - (if as-user - (worker-launcher-and-wait conf user ["signal" pid "15"] :log-prefix (str "kill -15 " pid)) - (Utils/killProcessWithSigTerm pid))) - (when-not (empty? pids) - (log-message "Sleep " shutdown-sleep-secs " seconds for execution of cleanup threads on worker.") - (Time/sleepSecs shutdown-sleep-secs)) - (doseq [pid pids] - (if as-user - (worker-launcher-and-wait conf user ["signal" pid "9"] :log-prefix (str "kill -9 " pid)) - (Utils/forceKillProcess pid)) - (let [path (ConfigUtils/workerPidPath conf id pid)] - (if as-user - (rmr-as-user conf id path) - (try - (log-debug "Removing path " path) - (.delete (File. path)) - (catch Exception e))))) ;; on windows, the supervisor may still holds the lock on the worker directory - (try-cleanup-worker conf supervisor id)) - (log-message "Shut down " (:supervisor-id supervisor) ":" id)) - -(def SUPERVISOR-ZK-ACLS - [(first ZooDefs$Ids/CREATOR_ALL_ACL) - (ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) ZooDefs$Ids/ANYONE_ID_UNSAFE)]) - -(defn supervisor-data [conf shared-context ^ISupervisor isupervisor] - {:conf conf - :shared-context shared-context - :isupervisor isupervisor - :active (atom true) - :uptime (Utils/makeUptimeComputer) - :version STORM-VERSION - :worker-thread-pids-atom (atom {}) - :storm-cluster-state (ClusterUtils/mkStormClusterState conf (when (Utils/isZkAuthenticationConfiguredStormServer conf) - SUPERVISOR-ZK-ACLS) - (ClusterStateContext. DaemonType/SUPERVISOR)) - :local-state (ConfigUtils/supervisorState conf) - :supervisor-id (.getSupervisorId isupervisor) - :assignment-id (.getAssignmentId isupervisor) - :my-hostname (Utils/hostname conf) - :curr-assignment (atom nil) ;; used for reporting used ports when heartbeating - :heartbeat-timer (StormTimer. nil - (reify Thread$UncaughtExceptionHandler - (^void uncaughtException - [this ^Thread t ^Throwable e] - (log-error e "Error when processing event") - (Utils/exitProcess 20 "Error when processing an event")))) - :event-timer (StormTimer. nil - (reify Thread$UncaughtExceptionHandler - (^void uncaughtException - [this ^Thread t ^Throwable e] - (log-error e "Error when processing event") - (Utils/exitProcess 20 "Error when processing an event")))) - :blob-update-timer (StormTimer. "blob-update-timer" - (reify Thread$UncaughtExceptionHandler - (^void uncaughtException - [this ^Thread t ^Throwable e] - (log-error e "Error when processing event") - (Utils/exitProcess 20 "Error when processing an event")))) - :localizer (Utils/createLocalizer conf (ConfigUtils/supervisorLocalDir conf)) - :assignment-versions (atom {}) - :sync-retry (atom 0) - :download-lock (Object.) - :stormid->profiler-actions (atom {}) - :resource-isolation-manager (if (conf STORM-RESOURCE-ISOLATION-PLUGIN-ENABLE) - (let [resource-isolation-manager (Utils/newInstance (conf STORM-RESOURCE-ISOLATION-PLUGIN))] - (.prepare resource-isolation-manager conf) - (log-message "Using resource isolation plugin " (conf STORM-RESOURCE-ISOLATION-PLUGIN)) - resource-isolation-manager) - nil) - }) - -(defn required-topo-files-exist? - [conf storm-id] - (let [stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id) - stormjarpath (ConfigUtils/supervisorStormJarPath stormroot) - stormcodepath (ConfigUtils/supervisorStormCodePath stormroot) - stormconfpath (ConfigUtils/supervisorStormConfPath stormroot)] - (and (every? #(Utils/checkFileExists %) [stormroot stormconfpath stormcodepath]) - (or (ConfigUtils/isLocalMode conf) - (Utils/checkFileExists stormjarpath))))) - -(defn get-worker-assignment-helper-msg - [assignment supervisor port id] - (str (pr-str assignment) " for this supervisor " (:supervisor-id supervisor) " on port " - port " with id " id)) - -(defn get-valid-new-worker-ids - [conf supervisor reassign-executors new-worker-ids] - (into {} - (remove nil? - (dofor [[port assignment] reassign-executors] - (let [id (new-worker-ids port) - storm-id (:storm-id assignment) - ^WorkerResources resources (:resources assignment)] - ;; This condition checks for required files exist before launching the worker - (if (required-topo-files-exist? conf storm-id) - (let [pids-path (ConfigUtils/workerPidsRoot conf id) - hb-path (ConfigUtils/workerHeartbeatsRoot conf id)] - (log-message "Launching worker with assignment " - (get-worker-assignment-helper-msg assignment supervisor port id)) - (FileUtils/forceMkdir (File. pids-path)) - (FileUtils/forceMkdir (File. hb-path)) - (launch-worker supervisor - (:storm-id assignment) - port - id - resources) - [id port]) - (do - (log-message "Missing topology storm code, so can't launch worker with assignment " - (get-worker-assignment-helper-msg assignment supervisor port id)) - nil))))))) - - -(defn- select-keys-pred - [pred amap] - (into {} (filter (fn [[k v]] (pred k)) amap))) - -(defn ->local-assignment - [^LocalAssignment thrift-local-assignment] - (mk-local-assignment - (.get_topology_id thrift-local-assignment) - (->executor-list (.get_executors thrift-local-assignment)) - (.get_resources thrift-local-assignment))) - -;TODO: when translating this function, you should replace the map-val with a proper for loop HERE -(defn ls-local-assignments - [^LocalState local-state] - (if-let [thrift-local-assignments (.getLocalAssignmentsMap local-state)] - (map-val ->local-assignment thrift-local-assignments))) - -;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE -(defn sync-processes [supervisor] - (let [conf (:conf supervisor) - ^LocalState local-state (:local-state supervisor) - storm-cluster-state (:storm-cluster-state supervisor) - assigned-executors (or (ls-local-assignments local-state) {}) - now (Time/currentTimeSecs) - allocated (read-allocated-workers supervisor assigned-executors now) - keepers (filter-val - (fn [[state _]] (= state :valid)) - allocated) - keep-ports (set (for [[id [_ hb]] keepers] (:port hb))) - reassign-executors (select-keys-pred (complement keep-ports) assigned-executors) - new-worker-ids (into - {} - (for [port (keys reassign-executors)] - [port (Utils/uuid)]))] - ;; 1. to kill are those in allocated that are dead or disallowed - ;; 2. kill the ones that should be dead - ;; - read pids, kill -9 and individually remove file - ;; - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) - ;; 3. of the rest, figure out what assignments aren't yet satisfied - ;; 4. generate new worker ids, write new "approved workers" to LS - ;; 5. create local dir for worker id - ;; 5. launch new workers (give worker-id, port, and supervisor-id) - ;; 6. wait for workers launch - - (log-debug "Syncing processes") - (log-debug "Assigned executors: " assigned-executors) - (log-debug "Allocated: " allocated) - (doseq [[id [state heartbeat]] allocated] - (when (not= :valid state) - (log-message - "Shutting down and clearing state for id " id - ". Current supervisor time: " now - ". State: " state - ", Heartbeat: " (pr-str heartbeat)) - (shutdown-worker supervisor id))) - (let [valid-new-worker-ids (get-valid-new-worker-ids conf supervisor reassign-executors new-worker-ids)] - (.setApprovedWorkers ^LocalState local-state - (merge - (select-keys (clojurify-structure (.getApprovedWorkers ^LocalState local-state)) - (keys keepers)) - valid-new-worker-ids)) - (wait-for-workers-launch conf (keys valid-new-worker-ids))))) - -(defn assigned-storm-ids-from-port-assignments [assignment] - (->> assignment - vals - (map :storm-id) - set)) - -;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE -(defn shutdown-disallowed-workers [supervisor] - (let [conf (:conf supervisor) - ^LocalState local-state (:local-state supervisor) - assigned-executors (or (ls-local-assignments local-state) {}) - now (Time/currentTimeSecs) - allocated (read-allocated-workers supervisor assigned-executors now) - disallowed (keys (filter-val - (fn [[state _]] (= state :disallowed)) - allocated))] - (log-debug "Allocated workers " allocated) - (log-debug "Disallowed workers " disallowed) - (doseq [id disallowed] - (shutdown-worker supervisor id)) - )) - -(defn get-blob-localname - "Given the blob information either gets the localname field if it exists, - else routines the default value passed in." - [blob-info defaultValue] - (or (get blob-info "localname") defaultValue)) - -(defn should-uncompress-blob? - "Given the blob information returns the value of the uncompress field, handling it either being - a string or a boolean value, or if it's not specified then returns false" - [blob-info] - (Boolean. (get blob-info "uncompress"))) - -(defn remove-blob-references - "Remove a reference to a blob when its no longer needed." - [localizer storm-id conf] - (let [storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id)) - blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) - user (storm-conf TOPOLOGY-SUBMITTER-USER) - topo-name (storm-conf TOPOLOGY-NAME)] - (if blobstore-map - (doseq [[k, v] blobstore-map] - (.removeBlobReference localizer - k - user - topo-name - (should-uncompress-blob? v)))))) - -(defn blobstore-map-to-localresources - "Returns a list of LocalResources based on the blobstore-map passed in." - [blobstore-map] - (if blobstore-map - (for [[k, v] blobstore-map] (LocalResource. k (should-uncompress-blob? v))) - ())) - -(defn add-blob-references - "For each of the downloaded topologies, adds references to the blobs that the topologies are - using. This is used to reconstruct the cache on restart." - [localizer storm-id conf] - (let [storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id)) - blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) - user (storm-conf TOPOLOGY-SUBMITTER-USER) - topo-name (storm-conf TOPOLOGY-NAME) - localresources (blobstore-map-to-localresources blobstore-map)] - (if blobstore-map - (.addReferences localizer localresources user topo-name)))) - -(defn rm-topo-files - [conf storm-id localizer rm-blob-refs?] - (let [path (ConfigUtils/supervisorStormDistRoot conf storm-id)] - (try - (if rm-blob-refs? - (remove-blob-references localizer storm-id conf)) - (if (conf SUPERVISOR-RUN-WORKER-AS-USER) - (rmr-as-user conf storm-id path) - (Utils/forceDelete (ConfigUtils/supervisorStormDistRoot conf storm-id))) - (catch Exception e - (log-message e (str "Exception removing: " storm-id)))))) - -(defn verify-downloaded-files - "Check for the files exists to avoid supervisor crashing - Also makes sure there is no necessity for locking" - [conf localizer assigned-storm-ids all-downloaded-storm-ids] - (remove nil? - (into #{} - (for [storm-id all-downloaded-storm-ids - :when (contains? assigned-storm-ids storm-id)] - (when-not (required-topo-files-exist? conf storm-id) - (log-debug "Files not present in topology directory") - (rm-topo-files conf storm-id localizer false) - storm-id))))) - -(defn ->LocalAssignment - [{storm-id :storm-id executors :executors resources :resources}] - (let [assignment (LocalAssignment. storm-id (->ExecutorInfo-list executors))] - (if resources (.set_resources assignment - (doto (WorkerResources. ) - (.set_mem_on_heap (first resources)) - (.set_mem_off_heap (second resources)) - (.set_cpu (last resources))))) - assignment)) - -;TODO: when translating this function, you should replace the map-val with a proper for loop HERE -(defn ls-local-assignments! - [^LocalState local-state assignments] - (let [local-assignment-map (map-val ->LocalAssignment assignments)] - (.setLocalAssignmentsMap local-state local-assignment-map))) - -(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager] - (fn callback-supervisor [] - (let [conf (:conf supervisor) - storm-cluster-state (:storm-cluster-state supervisor) - ^ISupervisor isupervisor (:isupervisor supervisor) - ^LocalState local-state (:local-state supervisor) - sync-callback (fn [] (.add event-manager (reify Runnable - (^void run [this] - (callback-supervisor))))) - assignment-versions @(:assignment-versions supervisor) - {assignments-snapshot :assignments - storm-id->profiler-actions :profiler-actions - versions :versions} - (assignments-snapshot storm-cluster-state sync-callback assignment-versions) - - storm-code-map (read-storm-code-locations assignments-snapshot) - all-downloaded-storm-ids (set (read-downloaded-storm-ids conf)) - existing-assignment (ls-local-assignments local-state) - all-assignment (read-assignments assignments-snapshot - (:assignment-id supervisor) - existing-assignment - (:sync-retry supervisor)) - ;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE - new-assignment (->> all-assignment - (filter-key #(.confirmAssigned isupervisor %))) - assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment) - localizer (:localizer supervisor) - checked-downloaded-storm-ids (set (verify-downloaded-files conf localizer assigned-storm-ids all-downloaded-storm-ids)) - downloaded-storm-ids (set/difference all-downloaded-storm-ids checked-downloaded-storm-ids)] - - (log-debug "Synchronizing supervisor") - (log-debug "Storm code map: " storm-code-map) - (log-debug "All assignment: " all-assignment) - (log-debug "New assignment: " new-assignment) - (log-debug "Assigned Storm Ids " assigned-storm-ids) - (log-debug "All Downloaded Ids " all-downloaded-storm-ids) - (log-debug "Checked Downloaded Ids " checked-downloaded-storm-ids) - (log-debug "Downloaded Ids " downloaded-storm-ids) - (log-debug "Storm Ids Profiler Actions " storm-id->profiler-actions) - ;; download code first - ;; This might take awhile - ;; - should this be done separately from usual monitoring? - ;; should we only download when topology is assigned to this supervisor? - (doseq [[storm-id master-code-dir] storm-code-map] - (when (and (not (downloaded-storm-ids storm-id)) - (assigned-storm-ids storm-id)) - (log-message "Downloading code for storm id " storm-id) - (try-cause - (download-storm-code conf storm-id master-code-dir localizer) - - (catch NimbusLeaderNotFoundException e - (log-warn-error e "Nimbus leader was not available.")) - (catch TTransportException e - (log-warn-error e "There was a connection problem with nimbus."))) - (log-message "Finished downloading code for storm id " storm-id))) - - (log-debug "Writing new assignment " - (pr-str new-assignment)) - (doseq [p (set/difference (set (keys existing-assignment)) - (set (keys new-assignment)))] - (.killedWorker isupervisor (int p))) - (.assigned isupervisor (keys new-assignment)) - (ls-local-assignments! local-state - new-assignment) - (reset! (:assignment-versions supervisor) versions) - (reset! (:stormid->profiler-actions supervisor) storm-id->profiler-actions) - - (reset! (:curr-assignment supervisor) new-assignment) - ;; remove any downloaded code that's no longer assigned or active - ;; important that this happens after setting the local assignment so that - ;; synchronize-supervisor doesn't try to launch workers for which the - ;; resources don't exist - (if (Utils/isOnWindows) (shutdown-disallowed-workers supervisor)) - (doseq [storm-id all-downloaded-storm-ids] - (when-not (storm-code-map storm-id) - (log-message "Removing code for storm id " - storm-id) - (rm-topo-files conf storm-id localizer true))) - (.add processes-event-manager (reify Runnable - (^void run [this] - (sync-processes))))))) - -(defn mk-supervisor-capacities - [conf] - {Config/SUPERVISOR_MEMORY_CAPACITY_MB (double (conf SUPERVISOR-MEMORY-CAPACITY-MB)) - Config/SUPERVISOR_CPU_CAPACITY (double (conf SUPERVISOR-CPU-CAPACITY))}) - -(defn update-blobs-for-topology! - "Update each blob listed in the topology configuration if the latest version of the blob - has not been downloaded." - [conf storm-id localizer] - (let [storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id)) - blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) - user (storm-conf TOPOLOGY-SUBMITTER-USER) - localresources (blobstore-map-to-localresources blobstore-map)] - (try - (.updateBlobs localizer localresources user) - (catch AuthorizationException authExp - (log-error authExp)) - (catch KeyNotFoundException knf - (log-error knf))))) - -(defn update-blobs-for-all-topologies-fn - "Returns a function that downloads all blobs listed in the topology configuration for all topologies assigned - to this supervisor, and creates version files with a suffix. The returned function is intended to be run periodically - by a timer, created elsewhere." - [supervisor] - (fn [] - (try-cause - (let [conf (:conf supervisor) - downloaded-storm-ids (set (read-downloaded-storm-ids conf)) - new-assignment @(:curr-assignment supervisor) - assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)] - (doseq [topology-id downloaded-storm-ids] - (let [storm-root (ConfigUtils/supervisorStormDistRoot conf topology-id)] - (when (assigned-storm-ids topology-id) - (log-debug "Checking Blob updates for storm topology id " topology-id " With target_dir: " storm-root) - (update-blobs-for-topology! conf topology-id (:localizer supervisor)))))) - (catch TTransportException e - (log-error - e - "Network error while updating blobs, will retry again later")) - (catch NimbusLeaderNotFoundException e - (log-error - e - "Nimbus unavailable to update blobs, will retry again later"))))) - -(defn jvm-cmd [cmd] - (let [java-home (.get (System/getenv) "JAVA_HOME")] - (if (nil? java-home) - cmd - (str java-home Utils/FILE_PATH_SEPARATOR "bin" Utils/FILE_PATH_SEPARATOR cmd)))) - -(defn java-cmd [] - (jvm-cmd "java")) - -(defn jmap-dump-cmd [profile-cmd pid target-dir] - [profile-cmd pid "jmap" target-dir]) - -(defn jstack-dump-cmd [profile-cmd pid target-dir] - [profile-cmd pid "jstack" target-dir]) - -(defn jprofile-start [profile-cmd pid] - [profile-cmd pid "start"]) - -(defn jprofile-stop [profile-cmd pid target-dir] - [profile-cmd pid "stop" target-dir]) - -(defn jprofile-dump [profile-cmd pid workers-artifacts-directory] - [profile-cmd pid "dump" workers-artifacts-directory]) - -(defn jprofile-jvm-restart [profile-cmd pid] - [profile-cmd pid "kill"]) - -(defn- delete-topology-profiler-action [storm-cluster-state storm-id profile-action] - (log-message "Deleting profiler action.." profile-action) - (.deleteTopologyProfileRequests storm-cluster-state storm-id (thriftify-profile-request profile-action))) - -(defnk launch-profiler-action-for-worker - "Launch profiler action for a worker" - [conf user target-dir command :environment {} :exit-code-on-profile-action nil :log-prefix nil] - (if-let [run-worker-as-user (conf SUPERVISOR-RUN-WORKER-AS-USER)] - (let [container-file (Utils/containerFilePath target-dir) - script-file (Utils/scriptFilePath target-dir)] - (log-message "Running as user:" user " command:" (Utils/shellCmd command)) - (if (Utils/checkFileExists container-file) (rmr-as-user conf container-file container-file)) - (if (Utils/checkFileExists script-file) (rmr-as-user conf script-file script-file)) - (worker-launcher - conf - user - ["profiler" target-dir (Utils/writeScript target-dir command environment)] - :log-prefix log-prefix - :exit-code-callback exit-code-on-profile-action - :directory (File. target-dir))) - (Utils/launchProcess - command - environment - log-prefix - exit-code-on-profile-action - (File. target-dir)))) - -(defn mk-run-profiler-actions-for-all-topologies - "Returns a function that downloads all profile-actions listed for all topologies assigned - to this supervisor, executes those actions as user and deletes them from zookeeper." - [supervisor] - (fn [] - (try - (let [conf (:conf supervisor) - stormid->profiler-actions @(:stormid->profiler-actions supervisor) - storm-cluster-state (:storm-cluster-state supervisor) - hostname (:my-hostname supervisor) - storm-home (System/getProperty "storm.home") - profile-cmd (str (clojure.java.io/file storm-home - "bin" - (conf WORKER-PROFILER-COMMAND))) - new-assignment @(:curr-assignment supervisor) - assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)] - (doseq [[storm-id profiler-actions] stormid->profiler-actions] - (when (not (empty? profiler-actions)) - (doseq [pro-action profiler-actions] - (if (= hostname (:host pro-action)) - (let [port (:port pro-action) - action ^ProfileAction (:action pro-action) - stop? (> (System/currentTimeMillis) (:timestamp pro-action)) - target-dir (ConfigUtils/workerArtifactsRoot conf storm-id port) - storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id)) - user (storm-conf TOPOLOGY-SUBMITTER-USER) - environment (if-let [env (storm-conf TOPOLOGY-ENVIRONMENT)] env {}) - worker-pid (slurp (ConfigUtils/workerArtifactsPidPath conf storm-id port)) - log-prefix (str "ProfilerAction process " storm-id ":" port " PROFILER_ACTION: " action " ") - ;; Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted - ;; The profiler plugin script validates if JVM is recording before starting another recording. - command (cond - (= action ProfileAction/JMAP_DUMP) (jmap-dump-cmd profile-cmd worker-pid target-dir) - (= action ProfileAction/JSTACK_DUMP) (jstack-dump-cmd profile-cmd worker-pid target-dir) - (= action ProfileAction/JPROFILE_DUMP) (jprofile-dump profile-cmd worker-pid target-dir) - (= action ProfileAction/JVM_RESTART) (jprofile-jvm-restart profile-cmd worker-pid) - (and (not stop?) - (= action ProfileAction/JPROFILE_STOP)) - (jprofile-start profile-cmd worker-pid) ;; Ensure the profiler is still running - (and stop? (= action ProfileAction/JPROFILE_STOP)) (jprofile-stop profile-cmd worker-pid target-dir)) - action-on-exit (fn [exit-code] - (log-message log-prefix " profile-action exited for code: " exit-code) - (if stop? - (delete-topology-profiler-action storm-cluster-state storm-id (thriftify-profile-request pro-action)))) - command (->> command (map str) (filter (complement empty?)))] - - (try - (launch-profiler-action-for-worker conf - user - target-dir - command - :environment environment - :exit-code-on-profile-action action-on-exit - :log-prefix log-prefix) - (catch IOException ioe - (log-error ioe - (str "Error in processing ProfilerAction '" action "' for " storm-id ":" port ", will retry later."))) - (catch RuntimeException rte - (log-error rte - (str "Error in processing ProfilerAction '" action "' for " storm-id ":" port ", will retry later.")))))))))) - (catch Exception e - (log-error e "Error running profiler actions, will retry again later"))))) - - -(defn is-waiting [^EventManagerImp event-manager] - (.waiting event-manager)) - -;; in local state, supervisor stores who its current assignments are -;; another thread launches events to restart any dead processes if necessary -(defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor] - (log-message "Starting Supervisor with conf " conf) - (.prepare isupervisor conf (ConfigUtils/supervisorIsupervisorDir conf)) - (FileUtils/cleanDirectory (File. (ConfigUtils/supervisorTmpDir conf))) - (let [supervisor (supervisor-data conf shared-context isupervisor) - [event-manager processes-event-manager :as managers] [(EventManagerImp. false) (EventManagerImp. false)] - sync-processes (partial sync-processes supervisor) - synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager) - synchronize-blobs-fn (update-blobs-for-all-topologies-fn supervisor) - downloaded-storm-ids (set (read-downloaded-storm-ids conf)) - run-profiler-actions-fn (mk-run-profiler-actions-for-all-topologies supervisor) - heartbeat-fn (fn [] (.supervisorHeartbeat - (:storm-cluster-state supervisor) - (:supervisor-id supervisor) - (thriftify-supervisor-info (->SupervisorInfo (Time/currentTimeSecs) - (:my-hostname supervisor) - (:assignment-id supervisor) - (keys @(:curr-assignment supervisor)) - ;; used ports - (.getMetadata isupervisor) - (conf SUPERVISOR-SCHEDULER-META) - (. (:uptime supervisor) upTime) - (:version supervisor) - (mk-supervisor-capacities conf)))))] - (heartbeat-fn) - - ;; should synchronize supervisor so it doesn't launch anything after being down (optimization) - (.scheduleRecurring (:heartbeat-timer supervisor) - 0 - (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS) - heartbeat-fn) - - (doseq [storm-id downloaded-storm-ids] - (add-blob-references (:localizer supervisor) storm-id - conf)) - ;; do this after adding the references so we don't try to clean things being used - (.startCleaner (:localizer supervisor)) - - (when (conf SUPERVISOR-ENABLE) - ;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up - ;; to date even if callbacks don't all work exactly right - (.scheduleRecurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager (reify Runnable - (^void run [this] - (synchronize-supervisor)))))) - - (.scheduleRecurring (:event-timer supervisor) - 0 - (conf SUPERVISOR-MONITOR-FREQUENCY-SECS) - (fn [] (.add processes-event-manager (reify Runnable - (^void run [this] - (sync-processes)))))) - - ;; Blob update thread. Starts with 30 seconds delay, every 30 seconds - (.scheduleRecurring (:blob-update-timer supervisor) - 30 - 30 - (fn [] (.add event-manager (reify Runnable - (^void run [this] - (synchronize-blobs-fn)))))) - - (.scheduleRecurring (:event-timer supervisor) - (* 60 5) - (* 60 5) - (fn [] - (let [health-code (HealthCheck/healthCheck conf) - ids (my-worker-ids conf)] - (if (not (= health-code 0)) - (do - (doseq [id ids] - (shutdown-worker supervisor id)) - (throw (RuntimeException. "Supervisor failed health check. Exiting."))))))) - - - ;; Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds - (.scheduleRecurring (:event-timer supervisor) - 30 - 30 - (fn [] (.add event-manager (reify Runnable - (^void run [this] - (run-profiler-actions-fn)))))) - ) - (log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor)) - (reify - Shutdownable - (shutdown [this] - (log-message "Shutting down supervisor " (:supervisor-id supervisor)) - (reset! (:active supervisor) false) - (.close (:heartbeat-timer supervisor)) - (.close (:event-timer supervisor)) - (.close (:blob-update-timer supervisor)) - (.close event-manager) - (.close processes-event-manager) - (.shutdown (:localizer supervisor)) - (.disconnect (:storm-cluster-state supervisor))) - SupervisorDaemon - (get-conf [this] - conf) - (get-id [this] - (:supervisor-id supervisor)) - (shutdown-all-workers [this] - (let [ids (my-worker-ids conf)] - (doseq [id ids] - (shutdown-worker supervisor id) - ))) - DaemonCommon - (waiting? [this] - (or (not @(:active supervisor)) - (and - (.isTimerWaiting (:heartbeat-timer supervisor)) - (.isTimerWaiting (:event-timer supervisor)) - (every? is-waiting managers))) - )))) - - - -(defn kill-supervisor [supervisor] - (.shutdown supervisor) - ) - -(defn setup-storm-code-dir - [conf storm-conf dir] - (if (conf SUPERVISOR-RUN-WORKER-AS-USER) - (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["code-dir" dir] :log-prefix (str "setup conf for " dir)))) - -(defn setup-blob-permission - [conf storm-conf path] - (if (conf SUPERVISOR-RUN-WORKER-AS-USER) - (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["blob" path] :log-prefix (str "setup blob permissions for " path)))) - -(defn download-blobs-for-topology! - "Download all blobs listed in the topology configuration for a given topology." - [conf stormconf-path localizer tmproot] - (let [storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConfGivenPath conf stormconf-path)) - blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) - user (storm-conf TOPOLOGY-SUBMITTER-USER) - topo-name (storm-conf TOPOLOGY-NAME) - user-dir (.getLocalUserFileCacheDir localizer user) - localresources (blobstore-map-to-localresources blobstore-map)] - (when localresources - (when-not (.exists user-dir) - (FileUtils/forceMkdir user-dir)) - (try - (let [localized-resources (.getBlobs localizer localresources user topo-name user-dir)] - (setup-blob-permission conf storm-conf (.toString user-dir)) - (doseq [local-rsrc localized-resources] - (let [rsrc-file-path (File. (.getFilePath local-rsrc)) - key-name (.getName rsrc-file-path) - blob-symlink-target-name (.getName (File. (.getCurrentSymlinkPath local-rsrc))) - symlink-name (get-blob-localname (get blobstore-map key-name) key-name)] - (Utils/createSymlink tmproot (.getParent rsrc-file-path) symlink-name - blob-symlink-target-name)))) - (catch AuthorizationException authExp - (log-error authExp)) - (catch KeyNotFoundException knf - (log-error knf)))))) - -(defn get-blob-file-names - [blobstore-map] - (if blobstore-map - (for [[k, data] blobstore-map] - (get-blob-localname data k)))) - -(defn download-blobs-for-topology-succeed? - "Assert if all blobs are downloaded for the given topology" - [stormconf-path target-dir] - (let [storm-conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. stormconf-path)))) - blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) - file-names (get-blob-file-names blobstore-map)] - (if-not (empty? file-names) - (every? #(Utils/checkFileExists target-dir %) file-names) - true))) - -;; distributed implementation -(defmethod download-storm-code - :distributed [conf storm-id master-code-dir localizer] - ;; Downloading to permanent location is atomic - - (let [tmproot (str (ConfigUtils/supervisorTmpDir conf) Utils/FILE_PATH_SEPARATOR (Utils/uuid)) - stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id) - blobstore (Utils/getClientBlobStoreForSupervisor conf)] - (FileUtils/forceMkdir (File. tmproot)) - (if-not (Utils/isOnWindows) - (Utils/restrictPermissions tmproot) - (if (conf SUPERVISOR-RUN-WORKER-AS-USER) - (throw (RuntimeException. (str "ERROR: Windows doesn't implement setting the correct permissions"))))) - (Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormJarKey storm-id) - (ConfigUtils/supervisorStormJarPath tmproot) blobstore) - (Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormCodeKey storm-id) - (ConfigUtils/supervisorStormCodePath tmproot) blobstore) - (Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormConfKey storm-id) - (ConfigUtils/supervisorStormConfPath tmproot) blobstore) - (.shutdown blobstore) - (Utils/extractDirFromJar (ConfigUtils/supervisorStormJarPath tmproot) ConfigUtils/RESOURCES_SUBDIR tmproot) - (download-blobs-for-topology! conf (ConfigUtils/supervisorStormConfPath tmproot) localizer - tmproot) - (if (download-blobs-for-topology-succeed? (ConfigUtils/supervisorStormConfPath tmproot) tmproot) - (do - (log-message "Successfully downloaded blob resources for storm-id " storm-id) - (FileUtils/forceMkdir (File. stormroot)) - (Files/move (.toPath (File. tmproot)) (.toPath (File. stormroot)) - (doto (make-array StandardCopyOption 1) (aset 0 StandardCopyOption/ATOMIC_MOVE))) - (setup-storm-code-dir conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id)) stormroot)) - (do - (log-message "Failed to download blob resources for storm-id " storm-id) - (Utils/forceDelete tmproot))))) - -(defn write-log-metadata-to-yaml-file! [storm-id port data conf] - (let [file (ConfigUtils/getLogMetaDataFile conf storm-id port)] - ;;run worker as user needs the directory to have special permissions - ;; or it is insecure - (when (not (.exists (.getParentFile file))) - (if (conf SUPERVISOR-RUN-WORKER-AS-USER) - (do (FileUtils/forceMkdir (.getParentFile file)) - (setup-storm-code-dir - conf - (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id)) - (.getCanonicalPath (.getParentFile file)))) - (.mkdirs (.getParentFile file)))) - (let [writer (java.io.FileWriter. file) - yaml (Yaml.)] - (try - (.dump yaml data writer) - (finally - (.close writer)))))) - -(defn write-log-metadata! [storm-conf user worker-id storm-id port conf] - (let [data {TOPOLOGY-SUBMITTER-USER user - "worker-id" worker-id - LOGS-GROUPS (sort (distinct (remove nil? - (concat - (storm-conf LOGS-GROUPS) - (storm-conf TOPOLOGY-GROUPS))))) - LOGS-USERS (sort (distinct (remove nil? - (concat - (storm-conf LOGS-USERS) - (storm-conf TOPOLOGY-USERS)))))}] - (write-log-metadata-to-yaml-file! storm-id port data conf))) - -(defn jlp [stormroot conf] - (let [resource-root (str stormroot File/separator ConfigUtils/RESOURCES_SUBDIR) - os (clojure.string/replace (System/getProperty "os.name") #"\s+" "_") - arch (System/getProperty "os.arch") - arch-resource-root (str resource-root File/separator os "-" arch)] - (str arch-resource-root File/pathSeparator resource-root File/pathSeparator (conf JAVA-LIBRARY-PATH)))) - -(defn substitute-childopts - "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap" - [value worker-id topology-id port mem-onheap] - (let [replacement-map {"%ID%" (str port) - "%WORKER-ID%" (str worker-id) - "%TOPOLOGY-ID%" (str topology-id) - "%WORKER-PORT%" (str port) - "%HEAP-MEM%" (str mem-onheap)} - sub-fn #(reduce (fn [string entry] - (apply clojure.string/replace string entry)) - % - replacement-map)] - (cond - (nil? value) nil - (sequential? value) (vec (map sub-fn value)) - :else (-> value sub-fn (clojure.string/split #"\s+"))))) - - -(defn create-blobstore-links - "Create symlinks in worker launch directory for all blobs" - [conf storm-id worker-id] - (let [stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id) - storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id)) - workerroot (ConfigUtils/workerRoot conf worker-id) - blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) - blob-file-names (get-blob-file-names blobstore-map) - resource-file-names (cons ConfigUtils/RESOURCES_SUBDIR blob-file-names)] - (log-message "Creating symlinks for worker-id: " worker-id " storm-id: " - storm-id " for files(" (count resource-file-names) "): " (pr-str resource-file-names)) - (Utils/createSymlink workerroot stormroot ConfigUtils/RESOURCES_SUBDIR) - (doseq [file-name blob-file-names] - (Utils/createSymlink workerroot stormroot file-name file-name)))) - -(defn create-artifacts-link - "Create a symlink from workder directory to its port artifacts directory" - [conf storm-id port worker-id] - (let [worker-dir (ConfigUtils/workerRoot conf worker-id) - topo-dir (ConfigUtils/workerArtifactsRoot conf storm-id)] - (log-message "Creating symlinks for worker-id: " worker-id " storm-id: " - storm-id " to its port artifacts directory") - (if (.exists (File. worker-dir)) - (Utils/createSymlink worker-dir topo-dir "artifacts" (str port))))) - -(defmethod launch-worker - :distributed [supervisor storm-id port worker-id resources] - (let [conf (:conf supervisor) - run-worker-as-user (conf SUPERVISOR-RUN-WORKER-AS-USER) - storm-home (System/getProperty "storm.home") - storm-options (System/getProperty "storm.options") - storm-conf-file (System/getProperty "storm.conf.file") - storm-log-dir (ConfigUtils/getLogDir) - storm-log-conf-dir (conf STORM-LOG4J2-CONF-DIR) - storm-log4j2-conf-dir (if storm-log-conf-dir - (if (.isAbsolute (File. storm-log-conf-dir)) - storm-log-conf-dir - (str storm-home Utils/FILE_PATH_SEPARATOR storm-log-conf-dir)) - (str storm-home Utils/FILE_PATH_SEPARATOR "log4j2")) - stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id) - jlp (jlp stormroot conf) - stormjar (ConfigUtils/supervisorStormJarPath stormroot) - storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id)) - topo-classpath (if-let [cp (storm-conf TOPOLOGY-CLASSPATH)] - [cp] - []) - classpath (-> (Utils/workerClasspath) - (Utils/addToClasspath [stormjar]) - (Utils/addToClasspath topo-classpath)) - top-gc-opts (storm-conf TOPOLOGY-WORKER-GC-CHILDOPTS) - - mem-onheap (if (and (.get_mem_on_heap resources) (> (.get_mem_on_heap resources) 0)) ;; not nil and not zero - (int (Math/ceil (.get_mem_on_heap resources))) ;; round up - (storm-conf WORKER-HEAP-MEMORY-MB)) ;; otherwise use default value - - mem-offheap (int (Math/ceil (.get_mem_off_heap resources))) - - cpu (int (Math/ceil (.get_cpu resources))) - - gc-opts (substitute-childopts (if top-gc-opts top-gc-opts (conf WORKER-GC-CHILDOPTS)) worker-id storm-id port mem-onheap) - topo-worker-logwriter-childopts (storm-conf TOPOLOGY-WORKER-LOGWRITER-CHILDOPTS) - user (storm-conf TOPOLOGY-SUBMITTER-USER) - logfilename "worker.log" - workers-artifacts (ConfigUtils/workerArtifactsRoot conf) - logging-sensitivity (storm-conf TOPOLOGY-LOGGING-SENSITIVITY "S3") - worker-childopts (when-let [s (conf WORKER-CHILDOPTS)] - (substitute-childopts s worker-id storm-id port mem-onheap)) - topo-worker-childopts (when-let [s (storm-conf TOPOLOGY-WORKER-CHILDOPTS)] - (substitute-childopts s worker-id storm-id port mem-onheap)) - worker--profiler-childopts (if (conf WORKER-PROFILER-ENABLED) - (substitute-childopts (conf WORKER-PROFILER-CHILDOPTS) worker-id storm-id port mem-onheap) - "") - topology-worker-environment (if-let [env (storm-conf TOPOLOGY-ENVIRONMENT)] - (merge env {"LD_LIBRARY_PATH" jlp}) - {"LD_LIBRARY_PATH" jlp}) - - log4j-configuration-file (str (if (.startsWith (System/getProperty "os.name") "Windows") - (if (.startsWith storm-log4j2-conf-dir "file:") - storm-log4j2-conf-dir - (str "file:///" storm-log4j2-conf-dir)) - storm-log4j2-conf-dir) - Utils/FILE_PATH_SEPARATOR "worker.xml") - - command (concat - [(java-cmd) "-cp" classpath - topo-worker-logwriter-childopts - (str "-Dlogfile.name=" logfilename) - (str "-Dstorm.home=" storm-home) - (str "-Dworkers.artifacts=" workers-artifacts) - (str "-Dstorm.id=" storm-id) - (str "-Dworker.id=" worker-id) - (str "-Dworker.port=" port) - (str "-Dstorm.log.dir=" storm-log-dir) - (str "-Dlog4j.configurationFile=" log4j-configuration-file) - (str "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector") - "org.apache.storm.LogWriter"] - [(java-cmd) "-server"] - worker-childopts - topo-worker-childopts - gc-opts - worker--profiler-childopts - [(str "-Djava.library.path=" jlp) - (str "-Dlogfile.name=" logfilename) - (str "-Dstorm.home=" storm-home) - (str "-Dworkers.artifacts=" workers-artifacts) - (str "-Dstorm.conf.file=" storm-conf-file) - (str "-Dstorm.options=" storm-options) - (str "-Dstorm.log.dir=" storm-log-dir) - (str "-Dlogging.sensitivity=" logging-sensitivity) - (str "-Dlog4j.configurationFile=" log4j-configuration-file) - (str "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector") - (str "-Dstorm.id=" storm-id) - (str "-Dworker.id=" worker-id) - (str "-Dworker.port=" port) - "-cp" classpath - "org.apache.storm.daemon.worker" - storm-id - (:assignment-id supervisor) - port - worker-id]) - command (->> command - (map str) - (filter (complement empty?))) - command (if (conf STORM-RESOURCE-ISOLATION-PLUGIN-ENABLE) - (do - (.reserveResourcesForWorker (:resource-isolation-manager supervisor) worker-id - {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB))))}) - (.getLaunchCommand (:resource-isolation-manager supervisor) worker-id - (java.util.ArrayList. (java.util.Arrays/asList (to-array command))))) - command)] - (log-message "Launching worker with command: " (Utils/shellCmd command)) - (write-log-metadata! storm-conf user worker-id storm-id port conf) - (ConfigUtils/setWorkerUserWSE conf worker-id user) - (create-artifacts-link conf storm-id port worker-id) - (let [log-prefix (str "Worker Process " worker-id) - callback (reify Utils$ExitCodeCallable - (call [this exit-code] - (log-message log-prefix " exited with code: " exit-code) - (add-dead-worker worker-id))) - worker-dir (ConfigUtils/workerRoot conf worker-id)] - (remove-dead-worker worker-id) - (create-blobstore-links conf storm-id worker-id) - (if run-worker-as-user - (worker-launcher conf user ["worker" worker-dir (Utils/writeScript worker-dir command topology-worker-environment)] :log-prefix log-prefix :exit-code-callback callback :directory (File. worker-dir)) - (Utils/launchProcess command - topology-worker-environment - log-prefix - callback - (File. worker-dir)))))) - -;; local implementation - -(defn resources-jar [] - (->> (.split (Utils/currentClasspath) File/pathSeparator) - (filter #(.endsWith % ".jar")) - (filter #(Utils/zipDoesContainDir % ConfigUtils/RESOURCES_SUBDIR)) - first )) - -(defmethod download-storm-code - :local [conf storm-id master-code-dir localizer] - (let [tmproot (str (ConfigUtils/supervisorTmpDir conf) Utils/FILE_PATH_SEPARATOR (Utils/uuid)) - stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id) - blob-store (Utils/getNimbusBlobStore conf master-code-dir nil)] - (try - (FileUtils/forceMkdir (File. tmproot)) - (.readBlobTo blob-store (ConfigUtils/masterStormCodeKey storm-id) (FileOutputStream. (ConfigUtils/supervisorStormCodePath tmproot)) nil) - (.readBlobTo blob-store (ConfigUtils/masterStormConfKey storm-id) (FileOutputStream. (ConfigUtils/supervisorStormConfPath tmproot)) nil) - (finally - (.shutdown blob-store))) - (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) - - (setup-storm-code-dir conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id)) stormroot) - (let [classloader (.getContextClassLoader (Thread/currentThread)) - resources-jar (resources-jar) - url (.getResource classloader ConfigUtils/RESOURCES_SUBDIR) - target-dir (str stormroot Utils/FILE_PATH_SEPARATOR ConfigUtils/RESOURCES_SUBDIR)] - (cond - resources-jar - (do - (log-message "Extracting resources from jar at " resources-jar " to " target-dir) - (Utils/extractDirFromJar resources-jar ConfigUtils/RESOURCES_SUBDIR stormroot)) - url - (do - (log-message "Copying resources at " (str url) " to " target-dir) - (FileUtils/copyDirectory (File. (.getFile url)) (File. target-dir))))))) - -(defmethod launch-worker - :local [supervisor storm-id port worker-id resources] - (let [conf (:conf supervisor) - pid (Utils/uuid) - worker (worker/mk-worker conf - (:shared-context supervisor) - storm-id - (:assignment-id supervisor) - port - worker-id)] - (ConfigUtils/setWorkerUserWSE conf worker-id "") - (ProcessSimulator/registerProcess pid worker) - (swap! (:worker-thread-pids-atom supervisor) assoc worker-id pid) - )) - -(defn -launch - [supervisor] - (log-message "Starting supervisor for storm version '" STORM-VERSION "'") - (let [conf (clojurify-structure (ConfigUtils/readStormConfig))] - (validate-distributed-mode! conf) - (let [supervisor (mk-supervisor conf nil supervisor)] - (Utils/addShutdownHookWithForceKillIn1Sec #(.shutdown supervisor))) - (defgauge supervisor:num-slots-used-gauge #(count (my-worker-ids conf))) - (start-metrics-reporters conf))) - -(defn standalone-supervisor [] - (let [conf-atom (atom nil) - id-atom (atom nil)] - (reify ISupervisor - (prepare [this conf local-dir] - (reset! conf-atom conf) - (let [state (LocalState. local-dir) - curr-id (if-let [id (.getSupervisorId state)] - id - (generate-supervisor-id))] - (.setSupervisorId state curr-id) - (reset! id-atom curr-id)) - ) - (confirmAssigned [this port] - true) - (getMetadata [this] - (doall (map int (get @conf-atom SUPERVISOR-SLOTS-PORTS)))) - (getSupervisorId [this] - @id-atom) - (getAssignmentId [this] - @id-atom) - (killedWorker [this port] - ) - (assigned [this ports] - )))) - -(defn -main [] - (Utils/setupDefaultUncaughtExceptionHandler) - (-launch (standalone-supervisor))) http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/clj/org/apache/storm/testing.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj index 8242c3e..a5dd1c0 100644 --- a/storm-core/src/clj/org/apache/storm/testing.clj +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@ -17,14 +17,15 @@ (ns org.apache.storm.testing (:require [org.apache.storm.daemon [nimbus :as nimbus] - [supervisor :as supervisor] + [local-supervisor :as local-supervisor] [common :as common] [worker :as worker] [executor :as executor]]) (:import [org.apache.commons.io FileUtils] [org.apache.storm.utils] [org.apache.storm.zookeeper Zookeeper] - [org.apache.storm ProcessSimulator]) + [org.apache.storm ProcessSimulator] + [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData ShutdownWork SupervisorManger]) (:import [java.io File]) (:import [java.util HashMap ArrayList]) (:import [java.util.concurrent.atomic AtomicInteger]) @@ -137,8 +138,10 @@ conf {STORM-LOCAL-DIR tmp-dir SUPERVISOR-SLOTS-PORTS port-ids}) - id-fn (if id (fn [] id) supervisor/generate-supervisor-id) - daemon (with-var-roots [supervisor/generate-supervisor-id id-fn] (supervisor/mk-supervisor supervisor-conf (:shared-context cluster-map) (supervisor/standalone-supervisor)))] + id-fn (if id id (Utils/uuid)) + isupervisor (proxy [StandaloneSupervisor] [] + (generateSupervisorId [] id-fn)) + daemon (local-supervisor/mk-local-supervisor supervisor-conf (:shared-context cluster-map) isupervisor)] (swap! (:supervisors cluster-map) conj daemon) (swap! (:tmp-dirs cluster-map) conj tmp-dir) daemon)) @@ -209,7 +212,7 @@ cluster-map)) (defn get-supervisor [cluster-map supervisor-id] - (let [pred (reify IPredicate (test [this x] (= (.get-id x) supervisor-id)))] + (let [pred (reify IPredicate (test [this x] (= (.getId x) supervisor-id)))] (Utils/findOne pred @(:supervisors cluster-map)))) (defn remove-first @@ -220,8 +223,8 @@ (concat b (rest e)))) (defn kill-supervisor [cluster-map supervisor-id] - (let [finder-fn #(= (.get-id %) supervisor-id) - pred (reify IPredicate (test [this x] (= (.get-id x) supervisor-id))) + (let [finder-fn #(= (.getId %) supervisor-id) + pred (reify IPredicate (test [this x] (= (.getId x) supervisor-id))) supervisors @(:supervisors cluster-map) sup (Utils/findOne pred supervisors)] @@ -241,9 +244,9 @@ (.close (:state cluster-map)) (.disconnect (:storm-cluster-state cluster-map)) (doseq [s @(:supervisors cluster-map)] - (.shutdown-all-workers s) + (.shutdownAllWorkers s) ;; race condition here? will it launch the workers again? - (supervisor/kill-supervisor s)) + (.shutdown s)) (ProcessSimulator/killAllProcesses) (if (not-nil? (:zookeeper cluster-map)) (do @@ -279,6 +282,8 @@ ([timeout-ms apredicate] (while-timeout timeout-ms (not (apredicate)) (Time/sleep 100)))) +(defn is-supervisor-waiting [^SupervisorManger supervisor] + (.isWaiting supervisor)) (defn wait-until-cluster-waiting "Wait until the cluster is idle. Should be used with time simulation." @@ -289,10 +294,10 @@ workers (filter (partial satisfies? common/DaemonCommon) (clojurify-structure (ProcessSimulator/getAllProcessHandles))) daemons (concat [(:nimbus cluster-map)] - supervisors ; because a worker may already be dead workers)] - (while-timeout timeout-ms (not (every? (memfn waiting?) daemons)) + (while-timeout timeout-ms (or (not (every? (memfn waiting?) daemons)) + (not (every? is-supervisor-waiting supervisors))) (Thread/sleep (rand-int 20)) ;; (doseq [d daemons] ;; (if-not ((memfn waiting?) d) @@ -386,12 +391,13 @@ (submit-local-topology nimbus storm-name conf topology))) (defn mk-capture-launch-fn [capture-atom] - (fn [supervisor storm-id port worker-id mem-onheap] - (let [supervisor-id (:supervisor-id supervisor) - conf (:conf supervisor) - existing (get @capture-atom [supervisor-id port] [])] - (ConfigUtils/setWorkerUserWSE conf worker-id "") - (swap! capture-atom assoc [supervisor-id port] (conj existing storm-id))))) + (fn [supervisorData stormId port workerId resources] + (let [conf (.getConf supervisorData) + supervisorId (.getSupervisorId supervisorData) + existing (get @capture-atom [supervisorId port] [])] + (log-message "mk-capture-launch-fn") + (ConfigUtils/setWorkerUserWSE conf workerId "") + (swap! capture-atom assoc [supervisorId port] (conj existing stormId))))) (defn find-worker-id [supervisor-conf port] @@ -407,21 +413,22 @@ (defn mk-capture-shutdown-fn [capture-atom] - (let [existing-fn supervisor/shutdown-worker] - (fn [supervisor worker-id] - (let [conf (:conf supervisor) - supervisor-id (:supervisor-id supervisor) - port (find-worker-port conf worker-id) + (let [shut-down (ShutdownWork.)] + (fn [supervisorData workerId] + (let [conf (.getConf supervisorData) + supervisor-id (.getSupervisorId supervisorData) + port (find-worker-port conf workerId) existing (get @capture-atom [supervisor-id port] 0)] + (log-message "mk-capture-shutdown-fn") (swap! capture-atom assoc [supervisor-id port] (inc existing)) - (existing-fn supervisor worker-id))))) + (.shutWorker shut-down supervisorData workerId))))) (defmacro capture-changed-workers [& body] `(let [launch-captured# (atom {}) shutdown-captured# (atom {})] - (with-var-roots [supervisor/launch-worker (mk-capture-launch-fn launch-captured#) - supervisor/shutdown-worker (mk-capture-shutdown-fn shutdown-captured#)] + (with-var-roots [local-supervisor/launch-local-worker (mk-capture-launch-fn launch-captured#) + local-supervisor/shutdown-local-worker (mk-capture-shutdown-fn shutdown-captured#)] ~@body {:launched @launch-captured# :shutdown @shutdown-captured#}))) http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java index 674454b..19328e5 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java @@ -31,16 +31,15 @@ import java.io.File; import java.io.IOException; import java.util.*; -public abstract class ShutdownWork implements Shutdownable { +public class ShutdownWork implements Shutdownable { private static Logger LOG = LoggerFactory.getLogger(ShutdownWork.class); public void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException { - LOG.info("Shutting down {}:{}", supervisorData.getSupervisorId(), workerId); Map conf = supervisorData.getConf(); Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId)); - Integer shutdownSleepSecs = (Integer) conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS); + Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)); Boolean asUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false); String user = ConfigUtils.getWorkerUser(conf, workerId); String threadPid = supervisorData.getWorkerThreadPidsAtom().get(workerId); @@ -109,13 +108,13 @@ public abstract class ShutdownWork implements Shutdownable { ConfigUtils.removeWorkerUserWSE(conf, workerId); supervisorData.getDeadWorkers().remove(workerId); } - if (conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE) != null) { + if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)){ supervisorData.getResourceIsolationManager().releaseResourcesForWorker(workerId); } } catch (IOException e) { - LOG.warn("{} Failed to cleanup worker {}. Will retry later", e, workerId); + LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e); } catch (RuntimeException e) { - LOG.warn("{} Failed to cleanup worker {}. Will retry later", e, workerId); + LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e); } } http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java index da54b88..c13df8b 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java @@ -20,6 +20,7 @@ package org.apache.storm.daemon.supervisor; import org.apache.storm.Config; import org.apache.storm.scheduler.ISupervisor; import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; import java.io.IOException; import java.util.Collection; @@ -38,7 +39,7 @@ public class StandaloneSupervisor implements ISupervisor { LocalState localState = new LocalState(schedulerLocalDir); String supervisorId = localState.getSupervisorId(); if (supervisorId == null) { - supervisorId = UUID.randomUUID().toString(); + supervisorId = generateSupervisorId(); localState.setSupervisorId(supervisorId); } this.conf = stormConf; @@ -79,4 +80,8 @@ public class StandaloneSupervisor implements ISupervisor { public void assigned(Collection<Integer> ports) { } + + public String generateSupervisorId(){ + return Utils.uuid(); + } } \ No newline at end of file
