http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj deleted file mode 100644 index 1ae4356..0000000 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ /dev/null @@ -1,1219 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.daemon.supervisor - (:import [java.io File IOException FileOutputStream]) - (:import [backtype.storm.scheduler ISupervisor] - [backtype.storm.utils LocalState Time Utils] - [backtype.storm.daemon Shutdownable] - [backtype.storm Constants] - [backtype.storm.cluster ClusterStateContext DaemonType] - [java.net JarURLConnection] - [java.net URI] - [org.apache.commons.io FileUtils]) - (:use [backtype.storm config util log timer local-state]) - (:import [backtype.storm.generated AuthorizationException KeyNotFoundException WorkerResources]) - (:import [backtype.storm.utils NimbusLeaderNotFoundException VersionInfo]) - (:import [java.nio.file Files StandardCopyOption]) - (:import [backtype.storm Config]) - (:import [backtype.storm.generated WorkerResources ProfileAction]) - (:import [backtype.storm.localizer LocalResource]) - (:use [backtype.storm.daemon common]) - (:require [backtype.storm.command [healthcheck :as healthcheck]]) - (:require [backtype.storm.daemon [worker :as worker]] - [backtype.storm [process-simulator :as psim] [cluster :as cluster] [event :as event]] - [clojure.set :as set]) - (:import [org.apache.thrift.transport TTransportException]) - (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms]) - (:import [org.yaml.snakeyaml Yaml] - [org.yaml.snakeyaml.constructor SafeConstructor]) - (:require [metrics.gauges :refer [defgauge]]) - (:require [metrics.meters :refer [defmeter mark!]]) - (:gen-class - :methods [^{:static true} [launch [backtype.storm.scheduler.ISupervisor] void]])) - -(defmeter supervisor:num-workers-launched) - -(defmulti download-storm-code cluster-mode) -(defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor)))) - -(def STORM-VERSION (VersionInfo/getVersion)) - -(defprotocol SupervisorDaemon - (get-id [this]) - (get-conf [this]) - (shutdown-all-workers [this]) - ) - -(defn- assignments-snapshot [storm-cluster-state callback assignment-versions] - (let [storm-ids (.assignments storm-cluster-state callback)] - (let [new-assignments - (->> - (dofor [sid storm-ids] - (let [recorded-version (:version (get assignment-versions sid))] - (if-let [assignment-version (.assignment-version storm-cluster-state sid callback)] - (if (= assignment-version recorded-version) - {sid (get assignment-versions sid)} - {sid (.assignment-info-with-version storm-cluster-state sid callback)}) - {sid nil}))) - (apply merge) - (filter-val not-nil?)) - new-profiler-actions - (->> - (dofor [sid (distinct storm-ids)] - (if-let [topo-profile-actions (.get-topology-profile-requests storm-cluster-state sid false)] - {sid topo-profile-actions})) - (apply merge))] - - {:assignments (into {} (for [[k v] new-assignments] [k (:data v)])) - :profiler-actions new-profiler-actions - :versions new-assignments}))) - -(defn- read-my-executors [assignments-snapshot storm-id assignment-id] - (let [assignment (get assignments-snapshot storm-id) - my-slots-resources (into {} - (filter (fn [[[node _] _]] (= node assignment-id)) - (:worker->resources assignment))) - my-executors (filter (fn [[_ [node _]]] (= node assignment-id)) - (:executor->node+port assignment)) - port-executors (apply merge-with - concat - (for [[executor [_ port]] my-executors] - {port [executor]} - ))] - (into {} (for [[port executors] port-executors] - ;; need to cast to int b/c it might be a long (due to how yaml parses things) - ;; doall is to avoid serialization/deserialization problems with lazy seqs - [(Integer. port) (mk-local-assignment storm-id (doall executors) (get my-slots-resources [assignment-id port]))] - )))) - -(defn- read-assignments - "Returns map from port to struct containing :storm-id, :executors and :resources" - ([assignments-snapshot assignment-id] - (->> (dofor [sid (keys assignments-snapshot)] (read-my-executors assignments-snapshot sid assignment-id)) - (apply merge-with (fn [& ignored] (throw-runtime "Should not have multiple topologies assigned to one port"))))) - ([assignments-snapshot assignment-id existing-assignment retries] - (try (let [assignments (read-assignments assignments-snapshot assignment-id)] - (reset! retries 0) - assignments) - (catch RuntimeException e - (if (> @retries 2) (throw e) (swap! retries inc)) - (log-warn (.getMessage e) ": retrying " @retries " of 3") - existing-assignment)))) - -(defn- read-storm-code-locations - [assignments-snapshot] - (map-val :master-code-dir assignments-snapshot)) - -(defn- read-downloaded-storm-ids [conf] - (map #(url-decode %) (read-dir-contents (supervisor-stormdist-root conf))) - ) - -(defn read-worker-heartbeat [conf id] - (let [local-state (worker-state conf id)] - (try - (ls-worker-heartbeat local-state) - (catch Exception e - (log-warn e "Failed to read local heartbeat for workerId : " id ",Ignoring exception.") - nil)))) - - -(defn my-worker-ids [conf] - (read-dir-contents (worker-root conf))) - -(defn read-worker-heartbeats - "Returns map from worker id to heartbeat" - [conf] - (let [ids (my-worker-ids conf)] - (into {} - (dofor [id ids] - [id (read-worker-heartbeat conf id)])) - )) - - -(defn matches-an-assignment? [worker-heartbeat assigned-executors] - (let [local-assignment (assigned-executors (:port worker-heartbeat))] - (and local-assignment - (= (:storm-id worker-heartbeat) (:storm-id local-assignment)) - (= (disj (set (:executors worker-heartbeat)) Constants/SYSTEM_EXECUTOR_ID) - (set (:executors local-assignment)))))) - -(let [dead-workers (atom #{})] - (defn get-dead-workers [] - @dead-workers) - (defn add-dead-worker [worker] - (swap! dead-workers conj worker)) - (defn remove-dead-worker [worker] - (swap! dead-workers disj worker))) - -(defn is-worker-hb-timed-out? [now hb conf] - (> (- now (:time-secs hb)) - (conf SUPERVISOR-WORKER-TIMEOUT-SECS))) - -(defn read-allocated-workers - "Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)" - [supervisor assigned-executors now] - (let [conf (:conf supervisor) - ^LocalState local-state (:local-state supervisor) - id->heartbeat (read-worker-heartbeats conf) - approved-ids (set (keys (ls-approved-workers local-state)))] - (into - {} - (dofor [[id hb] id->heartbeat] - (let [state (cond - (not hb) - :not-started - (or (not (contains? approved-ids id)) - (not (matches-an-assignment? hb assigned-executors))) - :disallowed - (or - (when (get (get-dead-workers) id) - (log-message "Worker Process " id " has died!") - true) - (is-worker-hb-timed-out? now hb conf)) - :timed-out - true - :valid)] - (log-debug "Worker " id " is " state ": " (pr-str hb) " at supervisor time-secs " now) - [id [state hb]] - )) - ))) - -(defn- wait-for-worker-launch [conf id start-time] - (let [state (worker-state conf id)] - (loop [] - (let [hb (ls-worker-heartbeat state)] - (when (and - (not hb) - (< - (- (current-time-secs) start-time) - (conf SUPERVISOR-WORKER-START-TIMEOUT-SECS) - )) - (log-message id " still hasn't started") - (Time/sleep 500) - (recur) - ))) - (when-not (ls-worker-heartbeat state) - (log-message "Worker " id " failed to start") - ))) - -(defn- wait-for-workers-launch [conf ids] - (let [start-time (current-time-secs)] - (doseq [id ids] - (wait-for-worker-launch conf id start-time)) - )) - -(defn generate-supervisor-id [] - (uuid)) - -(defnk worker-launcher [conf user args :environment {} :log-prefix nil :exit-code-callback nil :directory nil] - (let [_ (when (clojure.string/blank? user) - (throw (java.lang.IllegalArgumentException. - "User cannot be blank when calling worker-launcher."))) - wl-initial (conf SUPERVISOR-WORKER-LAUNCHER) - storm-home (System/getProperty "storm.home") - wl (if wl-initial wl-initial (str storm-home "/bin/worker-launcher")) - command (concat [wl user] args)] - (log-message "Running as user:" user " command:" (pr-str command)) - (launch-process command :environment environment :log-prefix log-prefix :exit-code-callback exit-code-callback :directory directory) - )) - -(defnk worker-launcher-and-wait [conf user args :environment {} :log-prefix nil] - (let [process (worker-launcher conf user args :environment environment)] - (if log-prefix - (read-and-log-stream log-prefix (.getInputStream process))) - (try - (.waitFor process) - (catch InterruptedException e - (log-message log-prefix " interrupted."))) - (.exitValue process))) - -(defn- rmr-as-user - "Launches a process owned by the given user that deletes the given path - recursively. Throws RuntimeException if the directory is not removed." - [conf id path] - (let [user (Utils/getFileOwner path)] - (worker-launcher-and-wait conf - user - ["rmr" path] - :log-prefix (str "rmr " id)) - (if (exists-file? path) - (throw (RuntimeException. (str path " was not deleted")))))) - -(defn try-cleanup-worker [conf id] - (try - (if (.exists (File. (worker-root conf id))) - (do - (if (conf SUPERVISOR-RUN-WORKER-AS-USER) - (rmr-as-user conf id (worker-root conf id)) - (do - (rmr (worker-heartbeats-root conf id)) - ;; this avoids a race condition with worker or subprocess writing pid around same time - (rmr (worker-pids-root conf id)) - (rmr (worker-root conf id)))) - (remove-worker-user! conf id) - (remove-dead-worker id) - )) - (catch IOException e - (log-warn-error e "Failed to cleanup worker " id ". Will retry later")) - (catch RuntimeException e - (log-warn-error e "Failed to cleanup worker " id ". Will retry later") - ) - (catch java.io.FileNotFoundException e (log-message (.getMessage e))) - )) - -(defn shutdown-worker [supervisor id] - (log-message "Shutting down " (:supervisor-id supervisor) ":" id) - (let [conf (:conf supervisor) - pids (read-dir-contents (worker-pids-root conf id)) - thread-pid (@(:worker-thread-pids-atom supervisor) id) - shutdown-sleep-secs (conf SUPERVISOR-WORKER-SHUTDOWN-SLEEP-SECS) - as-user (conf SUPERVISOR-RUN-WORKER-AS-USER) - user (get-worker-user conf id)] - (when thread-pid - (psim/kill-process thread-pid)) - (doseq [pid pids] - (if as-user - (worker-launcher-and-wait conf user ["signal" pid "15"] :log-prefix (str "kill -15 " pid)) - (kill-process-with-sig-term pid))) - (when-not (empty? pids) - (log-message "Sleep " shutdown-sleep-secs " seconds for execution of cleanup threads on worker.") - (sleep-secs shutdown-sleep-secs)) - (doseq [pid pids] - (if as-user - (worker-launcher-and-wait conf user ["signal" pid "9"] :log-prefix (str "kill -9 " pid)) - (force-kill-process pid)) - (if as-user - (rmr-as-user conf id (worker-pid-path conf id pid)) - (try - (rmpath (worker-pid-path conf id pid)) - (catch Exception e)))) ;; on windows, the supervisor may still holds the lock on the worker directory - (try-cleanup-worker conf id)) - (log-message "Shut down " (:supervisor-id supervisor) ":" id)) - -(def SUPERVISOR-ZK-ACLS - [(first ZooDefs$Ids/CREATOR_ALL_ACL) - (ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) ZooDefs$Ids/ANYONE_ID_UNSAFE)]) - -(defn supervisor-data [conf shared-context ^ISupervisor isupervisor] - {:conf conf - :shared-context shared-context - :isupervisor isupervisor - :active (atom true) - :uptime (uptime-computer) - :version STORM-VERSION - :worker-thread-pids-atom (atom {}) - :storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when - (Utils/isZkAuthenticationConfiguredStormServer - conf) - SUPERVISOR-ZK-ACLS) - :context (ClusterStateContext. DaemonType/SUPERVISOR)) - :local-state (supervisor-state conf) - :supervisor-id (.getSupervisorId isupervisor) - :assignment-id (.getAssignmentId isupervisor) - :my-hostname (hostname conf) - :curr-assignment (atom nil) ;; used for reporting used ports when heartbeating - :heartbeat-timer (mk-timer :kill-fn (fn [t] - (log-error t "Error when processing event") - (exit-process! 20 "Error when processing an event") - )) - :event-timer (mk-timer :kill-fn (fn [t] - (log-error t "Error when processing event") - (exit-process! 20 "Error when processing an event") - )) - :blob-update-timer (mk-timer :kill-fn (defn blob-update-timer - [t] - (log-error t "Error when processing event") - (exit-process! 20 "Error when processing a event")) - :timer-name "blob-update-timer") - :localizer (Utils/createLocalizer conf (supervisor-local-dir conf)) - :assignment-versions (atom {}) - :sync-retry (atom 0) - :download-lock (Object.) - :stormid->profiler-actions (atom {}) - }) - -(defn required-topo-files-exist? - [conf storm-id] - (let [stormroot (supervisor-stormdist-root conf storm-id) - stormjarpath (supervisor-stormjar-path stormroot) - stormcodepath (supervisor-stormcode-path stormroot) - stormconfpath (supervisor-stormconf-path stormroot)] - (and (every? exists-file? [stormroot stormconfpath stormcodepath]) - (or (local-mode? conf) - (exists-file? stormjarpath))))) - -(defn get-worker-assignment-helper-msg - [assignment supervisor port id] - (str (pr-str assignment) " for this supervisor " (:supervisor-id supervisor) " on port " - port " with id " id)) - -(defn get-valid-new-worker-ids - [conf supervisor reassign-executors new-worker-ids] - (into {} - (remove nil? - (dofor [[port assignment] reassign-executors] - (let [id (new-worker-ids port) - storm-id (:storm-id assignment) - ^WorkerResources resources (:resources assignment) - mem-onheap (.get_mem_on_heap resources)] - ;; This condition checks for required files exist before launching the worker - (if (required-topo-files-exist? conf storm-id) - (do - (log-message "Launching worker with assignment " - (get-worker-assignment-helper-msg assignment supervisor port id)) - (local-mkdirs (worker-pids-root conf id)) - (local-mkdirs (worker-heartbeats-root conf id)) - (launch-worker supervisor - (:storm-id assignment) - port - id - mem-onheap) - [id port]) - (do - (log-message "Missing topology storm code, so can't launch worker with assignment " - (get-worker-assignment-helper-msg assignment supervisor port id)) - nil))))))) - -(defn sync-processes [supervisor] - (let [conf (:conf supervisor) - ^LocalState local-state (:local-state supervisor) - storm-cluster-state (:storm-cluster-state supervisor) - assigned-executors (defaulted (ls-local-assignments local-state) {}) - now (current-time-secs) - allocated (read-allocated-workers supervisor assigned-executors now) - keepers (filter-val - (fn [[state _]] (= state :valid)) - allocated) - keep-ports (set (for [[id [_ hb]] keepers] (:port hb))) - reassign-executors (select-keys-pred (complement keep-ports) assigned-executors) - new-worker-ids (into - {} - (for [port (keys reassign-executors)] - [port (uuid)]))] - ;; 1. to kill are those in allocated that are dead or disallowed - ;; 2. kill the ones that should be dead - ;; - read pids, kill -9 and individually remove file - ;; - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) - ;; 3. of the rest, figure out what assignments aren't yet satisfied - ;; 4. generate new worker ids, write new "approved workers" to LS - ;; 5. create local dir for worker id - ;; 5. launch new workers (give worker-id, port, and supervisor-id) - ;; 6. wait for workers launch - - (log-debug "Syncing processes") - (log-debug "Assigned executors: " assigned-executors) - (log-debug "Allocated: " allocated) - (doseq [[id [state heartbeat]] allocated] - (when (not= :valid state) - (log-message - "Shutting down and clearing state for id " id - ". Current supervisor time: " now - ". State: " state - ", Heartbeat: " (pr-str heartbeat)) - (shutdown-worker supervisor id))) - (let [valid-new-worker-ids (get-valid-new-worker-ids conf supervisor reassign-executors new-worker-ids)] - (ls-approved-workers! local-state - (merge - (select-keys (ls-approved-workers local-state) - (keys keepers)) - valid-new-worker-ids)) - (wait-for-workers-launch conf (keys valid-new-worker-ids))))) - -(defn assigned-storm-ids-from-port-assignments [assignment] - (->> assignment - vals - (map :storm-id) - set)) - -(defn shutdown-disallowed-workers [supervisor] - (let [conf (:conf supervisor) - ^LocalState local-state (:local-state supervisor) - assigned-executors (defaulted (ls-local-assignments local-state) {}) - now (current-time-secs) - allocated (read-allocated-workers supervisor assigned-executors now) - disallowed (keys (filter-val - (fn [[state _]] (= state :disallowed)) - allocated))] - (log-debug "Allocated workers " allocated) - (log-debug "Disallowed workers " disallowed) - (doseq [id disallowed] - (shutdown-worker supervisor id)) - )) - -(defn get-blob-localname - "Given the blob information either gets the localname field if it exists, - else routines the default value passed in." - [blob-info defaultValue] - (or (get blob-info "localname") defaultValue)) - -(defn should-uncompress-blob? - "Given the blob information returns the value of the uncompress field, handling it either being - a string or a boolean value, or if it's not specified then returns false" - [blob-info] - (Boolean. (get blob-info "uncompress"))) - -(defn remove-blob-references - "Remove a reference to a blob when its no longer needed." - [localizer storm-id conf] - (let [storm-conf (read-supervisor-storm-conf conf storm-id) - blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) - user (storm-conf TOPOLOGY-SUBMITTER-USER) - topo-name (storm-conf TOPOLOGY-NAME)] - (if blobstore-map - (doseq [[k, v] blobstore-map] - (.removeBlobReference localizer - k - user - topo-name - (should-uncompress-blob? v)))))) - -(defn blobstore-map-to-localresources - "Returns a list of LocalResources based on the blobstore-map passed in." - [blobstore-map] - (if blobstore-map - (for [[k, v] blobstore-map] (LocalResource. k (should-uncompress-blob? v))) - ())) - -(defn add-blob-references - "For each of the downloaded topologies, adds references to the blobs that the topologies are - using. This is used to reconstruct the cache on restart." - [localizer storm-id conf] - (let [storm-conf (read-supervisor-storm-conf conf storm-id) - blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) - user (storm-conf TOPOLOGY-SUBMITTER-USER) - topo-name (storm-conf TOPOLOGY-NAME) - localresources (blobstore-map-to-localresources blobstore-map)] - (if blobstore-map - (.addReferences localizer localresources user topo-name)))) - -(defn rm-topo-files - [conf storm-id localizer rm-blob-refs?] - (let [path (supervisor-stormdist-root conf storm-id)] - (try - (if rm-blob-refs? - (remove-blob-references localizer storm-id conf)) - (if (conf SUPERVISOR-RUN-WORKER-AS-USER) - (rmr-as-user conf storm-id path) - (rmr (supervisor-stormdist-root conf storm-id))) - (catch Exception e - (log-message e (str "Exception removing: " storm-id)))))) - -(defn verify-downloaded-files - "Check for the files exists to avoid supervisor crashing - Also makes sure there is no necessity for locking" - [conf localizer assigned-storm-ids all-downloaded-storm-ids] - (remove nil? - (into #{} - (for [storm-id all-downloaded-storm-ids - :when (contains? assigned-storm-ids storm-id)] - (when-not (required-topo-files-exist? conf storm-id) - (log-debug "Files not present in topology directory") - (rm-topo-files conf storm-id localizer false) - storm-id))))) - -(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager] - (fn this [] - (let [conf (:conf supervisor) - storm-cluster-state (:storm-cluster-state supervisor) - ^ISupervisor isupervisor (:isupervisor supervisor) - ^LocalState local-state (:local-state supervisor) - sync-callback (fn [& ignored] (.add event-manager this)) - assignment-versions @(:assignment-versions supervisor) - {assignments-snapshot :assignments - storm-id->profiler-actions :profiler-actions - versions :versions} - (assignments-snapshot storm-cluster-state sync-callback assignment-versions) - storm-code-map (read-storm-code-locations assignments-snapshot) - all-downloaded-storm-ids (set (read-downloaded-storm-ids conf)) - existing-assignment (ls-local-assignments local-state) - all-assignment (read-assignments assignments-snapshot - (:assignment-id supervisor) - existing-assignment - (:sync-retry supervisor)) - new-assignment (->> all-assignment - (filter-key #(.confirmAssigned isupervisor %))) - assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment) - localizer (:localizer supervisor) - checked-downloaded-storm-ids (set (verify-downloaded-files conf localizer assigned-storm-ids all-downloaded-storm-ids)) - downloaded-storm-ids (set/difference all-downloaded-storm-ids checked-downloaded-storm-ids)] - - (log-debug "Synchronizing supervisor") - (log-debug "Storm code map: " storm-code-map) - (log-debug "All assignment: " all-assignment) - (log-debug "New assignment: " new-assignment) - (log-debug "Assigned Storm Ids " assigned-storm-ids) - (log-debug "All Downloaded Ids " all-downloaded-storm-ids) - (log-debug "Checked Downloaded Ids " checked-downloaded-storm-ids) - (log-debug "Downloaded Ids " downloaded-storm-ids) - (log-debug "Storm Ids Profiler Actions " storm-id->profiler-actions) - ;; download code first - ;; This might take awhile - ;; - should this be done separately from usual monitoring? - ;; should we only download when topology is assigned to this supervisor? - (doseq [[storm-id master-code-dir] storm-code-map] - (when (and (not (downloaded-storm-ids storm-id)) - (assigned-storm-ids storm-id)) - (log-message "Downloading code for storm id " storm-id) - (try-cause - (download-storm-code conf storm-id master-code-dir localizer) - - (catch NimbusLeaderNotFoundException e - (log-warn-error e "Nimbus leader was not available.")) - (catch TTransportException e - (log-warn-error e "There was a connection problem with nimbus."))) - (log-message "Finished downloading code for storm id " storm-id))) - - (log-debug "Writing new assignment " - (pr-str new-assignment)) - (doseq [p (set/difference (set (keys existing-assignment)) - (set (keys new-assignment)))] - (.killedWorker isupervisor (int p))) - (.assigned isupervisor (keys new-assignment)) - (ls-local-assignments! local-state - new-assignment) - (reset! (:assignment-versions supervisor) versions) - (reset! (:stormid->profiler-actions supervisor) storm-id->profiler-actions) - - (reset! (:curr-assignment supervisor) new-assignment) - ;; remove any downloaded code that's no longer assigned or active - ;; important that this happens after setting the local assignment so that - ;; synchronize-supervisor doesn't try to launch workers for which the - ;; resources don't exist - (if on-windows? (shutdown-disallowed-workers supervisor)) - (doseq [storm-id all-downloaded-storm-ids] - (when-not (storm-code-map storm-id) - (log-message "Removing code for storm id " - storm-id) - (rm-topo-files conf storm-id localizer true))) - (.add processes-event-manager sync-processes)))) - -(defn mk-supervisor-capacities - [conf] - {Config/SUPERVISOR_MEMORY_CAPACITY_MB (double (conf SUPERVISOR-MEMORY-CAPACITY-MB)) - Config/SUPERVISOR_CPU_CAPACITY (double (conf SUPERVISOR-CPU-CAPACITY))}) - -(defn update-blobs-for-topology! - "Update each blob listed in the topology configuration if the latest version of the blob - has not been downloaded." - [conf storm-id localizer] - (let [storm-conf (read-supervisor-storm-conf conf storm-id) - blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) - user (storm-conf TOPOLOGY-SUBMITTER-USER) - localresources (blobstore-map-to-localresources blobstore-map)] - (try - (.updateBlobs localizer localresources user) - (catch AuthorizationException authExp - (log-error authExp)) - (catch KeyNotFoundException knf - (log-error knf))))) - -(defn update-blobs-for-all-topologies-fn - "Returns a function that downloads all blobs listed in the topology configuration for all topologies assigned - to this supervisor, and creates version files with a suffix. The returned function is intended to be run periodically - by a timer, created elsewhere." - [supervisor] - (fn [] - (try-cause - (let [conf (:conf supervisor) - downloaded-storm-ids (set (read-downloaded-storm-ids conf)) - new-assignment @(:curr-assignment supervisor) - assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)] - (doseq [topology-id downloaded-storm-ids] - (let [storm-root (supervisor-stormdist-root conf topology-id)] - (when (assigned-storm-ids topology-id) - (log-debug "Checking Blob updates for storm topology id " topology-id " With target_dir: " storm-root) - (update-blobs-for-topology! conf topology-id (:localizer supervisor)))))) - (catch TTransportException e - (log-error - e - "Network error while updating blobs, will retry again later")) - (catch NimbusLeaderNotFoundException e - (log-error - e - "Nimbus unavailable to update blobs, will retry again later"))))) - -(defn jvm-cmd [cmd] - (let [java-home (.get (System/getenv) "JAVA_HOME")] - (if (nil? java-home) - cmd - (str java-home file-path-separator "bin" file-path-separator cmd)))) - -(defn java-cmd [] - (jvm-cmd "java")) - -(defn jmap-dump-cmd [profile-cmd pid target-dir] - [profile-cmd pid "jmap" target-dir]) - -(defn jstack-dump-cmd [profile-cmd pid target-dir] - [profile-cmd pid "jstack" target-dir]) - -(defn jprofile-start [profile-cmd pid] - [profile-cmd pid "start"]) - -(defn jprofile-stop [profile-cmd pid target-dir] - [profile-cmd pid "stop" target-dir]) - -(defn jprofile-dump [profile-cmd pid workers-artifacts-directory] - [profile-cmd pid "dump" workers-artifacts-directory]) - -(defn jprofile-jvm-restart [profile-cmd pid] - [profile-cmd pid "kill"]) - -(defn- delete-topology-profiler-action [storm-cluster-state storm-id profile-action] - (log-message "Deleting profiler action.." profile-action) - (.delete-topology-profile-requests storm-cluster-state storm-id profile-action)) - -(defnk launch-profiler-action-for-worker - "Launch profiler action for a worker" - [conf user target-dir command :environment {} :exit-code-on-profile-action nil :log-prefix nil] - (if-let [run-worker-as-user (conf SUPERVISOR-RUN-WORKER-AS-USER)] - (let [container-file (container-file-path target-dir) - script-file (script-file-path target-dir)] - (log-message "Running as user:" user " command:" (shell-cmd command)) - (if (exists-file? container-file) (rmr-as-user conf container-file container-file)) - (if (exists-file? script-file) (rmr-as-user conf script-file script-file)) - (worker-launcher - conf - user - ["profiler" target-dir (write-script target-dir command :environment environment)] - :log-prefix log-prefix - :exit-code-callback exit-code-on-profile-action - :directory (File. target-dir))) - (launch-process - command - :environment environment - :log-prefix log-prefix - :exit-code-callback exit-code-on-profile-action - :directory (File. target-dir)))) - -(defn mk-run-profiler-actions-for-all-topologies - "Returns a function that downloads all profile-actions listed for all topologies assigned - to this supervisor, executes those actions as user and deletes them from zookeeper." - [supervisor] - (fn [] - (try - (let [conf (:conf supervisor) - stormid->profiler-actions @(:stormid->profiler-actions supervisor) - storm-cluster-state (:storm-cluster-state supervisor) - hostname (:my-hostname supervisor) - profile-cmd (conf WORKER-PROFILER-COMMAND) - new-assignment @(:curr-assignment supervisor) - assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)] - (doseq [[storm-id profiler-actions] stormid->profiler-actions] - (when (not (empty? profiler-actions)) - (doseq [pro-action profiler-actions] - (if (= hostname (:host pro-action)) - (let [port (:port pro-action) - action ^ProfileAction (:action pro-action) - stop? (> (System/currentTimeMillis) (:timestamp pro-action)) - target-dir (worker-artifacts-root conf storm-id port) - storm-conf (read-supervisor-storm-conf conf storm-id) - user (storm-conf TOPOLOGY-SUBMITTER-USER) - environment (if-let [env (storm-conf TOPOLOGY-ENVIRONMENT)] env {}) - worker-pid (slurp (worker-artifacts-pid-path conf storm-id port)) - log-prefix (str "ProfilerAction process " storm-id ":" port " PROFILER_ACTION: " action " ") - ;; Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted - ;; The profiler plugin script validates if JVM is recording before starting another recording. - command (cond - (= action ProfileAction/JMAP_DUMP) (jmap-dump-cmd profile-cmd worker-pid target-dir) - (= action ProfileAction/JSTACK_DUMP) (jstack-dump-cmd profile-cmd worker-pid target-dir) - (= action ProfileAction/JPROFILE_DUMP) (jprofile-dump profile-cmd worker-pid target-dir) - (= action ProfileAction/JVM_RESTART) (jprofile-jvm-restart profile-cmd worker-pid) - (and (not stop?) - (= action ProfileAction/JPROFILE_STOP)) - (jprofile-start profile-cmd worker-pid) ;; Ensure the profiler is still running - (and stop? (= action ProfileAction/JPROFILE_STOP)) (jprofile-stop profile-cmd worker-pid target-dir)) - action-on-exit (fn [exit-code] - (log-message log-prefix " profile-action exited for code: " exit-code) - (if (and (= exit-code 0) stop?) - (delete-topology-profiler-action storm-cluster-state storm-id pro-action))) - command (->> command (map str) (filter (complement empty?)))] - - (try - (launch-profiler-action-for-worker conf - user - target-dir - command - :environment environment - :exit-code-on-profile-action action-on-exit - :log-prefix log-prefix) - (catch IOException ioe - (log-error ioe - (str "Error in processing ProfilerAction '" action "' for " storm-id ":" port ", will retry later."))) - (catch RuntimeException rte - (log-error rte - (str "Error in processing ProfilerAction '" action "' for " storm-id ":" port ", will retry later.")))))))))) - (catch Exception e - (log-error e "Error running profiler actions, will retry again later"))))) - -;; in local state, supervisor stores who its current assignments are -;; another thread launches events to restart any dead processes if necessary -(defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor] - (log-message "Starting Supervisor with conf " conf) - (.prepare isupervisor conf (supervisor-isupervisor-dir conf)) - (FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf))) - (let [supervisor (supervisor-data conf shared-context isupervisor) - [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)] - sync-processes (partial sync-processes supervisor) - synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager) - synchronize-blobs-fn (update-blobs-for-all-topologies-fn supervisor) - downloaded-storm-ids (set (read-downloaded-storm-ids conf)) - run-profiler-actions-fn (mk-run-profiler-actions-for-all-topologies supervisor) - heartbeat-fn (fn [] (.supervisor-heartbeat! - (:storm-cluster-state supervisor) - (:supervisor-id supervisor) - (->SupervisorInfo (current-time-secs) - (:my-hostname supervisor) - (:assignment-id supervisor) - (keys @(:curr-assignment supervisor)) - ;; used ports - (.getMetadata isupervisor) - (conf SUPERVISOR-SCHEDULER-META) - ((:uptime supervisor)) - (:version supervisor) - (mk-supervisor-capacities conf))))] - (heartbeat-fn) - - ;; should synchronize supervisor so it doesn't launch anything after being down (optimization) - (schedule-recurring (:heartbeat-timer supervisor) - 0 - (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS) - heartbeat-fn) - (doseq [storm-id downloaded-storm-ids] - (add-blob-references (:localizer supervisor) storm-id - conf)) - ;; do this after adding the references so we don't try to clean things being used - (.startCleaner (:localizer supervisor)) - - (when (conf SUPERVISOR-ENABLE) - ;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up - ;; to date even if callbacks don't all work exactly right - (schedule-recurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor))) - (schedule-recurring (:event-timer supervisor) - 0 - (conf SUPERVISOR-MONITOR-FREQUENCY-SECS) - (fn [] (.add processes-event-manager sync-processes))) - - ;; Blob update thread. Starts with 30 seconds delay, every 30 seconds - (schedule-recurring (:blob-update-timer supervisor) - 30 - 30 - (fn [] (.add event-manager synchronize-blobs-fn))) - - (schedule-recurring (:event-timer supervisor) - (* 60 5) - (* 60 5) - (fn [] (let [health-code (healthcheck/health-check conf) - ids (my-worker-ids conf)] - (if (not (= health-code 0)) - (do - (doseq [id ids] - (shutdown-worker supervisor id)) - (throw (RuntimeException. "Supervisor failed health check. Exiting."))))))) - - ;; Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds - (schedule-recurring (:event-timer supervisor) - 30 - 30 - (fn [] (.add event-manager run-profiler-actions-fn)))) - (log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor)) - (reify - Shutdownable - (shutdown [this] - (log-message "Shutting down supervisor " (:supervisor-id supervisor)) - (reset! (:active supervisor) false) - (cancel-timer (:heartbeat-timer supervisor)) - (cancel-timer (:event-timer supervisor)) - (cancel-timer (:blob-update-timer supervisor)) - (.shutdown event-manager) - (.shutdown processes-event-manager) - (.shutdown (:localizer supervisor)) - (.disconnect (:storm-cluster-state supervisor))) - SupervisorDaemon - (get-conf [this] - conf) - (get-id [this] - (:supervisor-id supervisor)) - (shutdown-all-workers [this] - (let [ids (my-worker-ids conf)] - (doseq [id ids] - (shutdown-worker supervisor id) - ))) - DaemonCommon - (waiting? [this] - (or (not @(:active supervisor)) - (and - (timer-waiting? (:heartbeat-timer supervisor)) - (timer-waiting? (:event-timer supervisor)) - (every? (memfn waiting?) managers))) - )))) - -(defn kill-supervisor [supervisor] - (.shutdown supervisor) - ) - -(defn setup-storm-code-dir - [conf storm-conf dir] - (if (conf SUPERVISOR-RUN-WORKER-AS-USER) - (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["code-dir" dir] :log-prefix (str "setup conf for " dir)))) - -(defn setup-blob-permission - [conf storm-conf path] - (if (conf SUPERVISOR-RUN-WORKER-AS-USER) - (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["blob" path] :log-prefix (str "setup blob permissions for " path)))) - -(defn download-blobs-for-topology! - "Download all blobs listed in the topology configuration for a given topology." - [conf stormconf-path localizer tmproot] - (let [storm-conf (read-supervisor-storm-conf-given-path conf stormconf-path) - blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) - user (storm-conf TOPOLOGY-SUBMITTER-USER) - topo-name (storm-conf TOPOLOGY-NAME) - user-dir (.getLocalUserFileCacheDir localizer user) - localresources (blobstore-map-to-localresources blobstore-map)] - (when localresources - (when-not (.exists user-dir) - (FileUtils/forceMkdir user-dir)) - (try - (let [localized-resources (.getBlobs localizer localresources user topo-name user-dir)] - (setup-blob-permission conf storm-conf (.toString user-dir)) - (doseq [local-rsrc localized-resources] - (let [rsrc-file-path (File. (.getFilePath local-rsrc)) - key-name (.getName rsrc-file-path) - blob-symlink-target-name (.getName (File. (.getCurrentSymlinkPath local-rsrc))) - symlink-name (get-blob-localname (get blobstore-map key-name) key-name)] - (create-symlink! tmproot (.getParent rsrc-file-path) symlink-name - blob-symlink-target-name)))) - (catch AuthorizationException authExp - (log-error authExp)) - (catch KeyNotFoundException knf - (log-error knf)))))) - -(defn get-blob-file-names - [blobstore-map] - (if blobstore-map - (for [[k, data] blobstore-map] - (get-blob-localname data k)))) - -(defn download-blobs-for-topology-succeed? - "Assert if all blobs are downloaded for the given topology" - [stormconf-path target-dir] - (let [storm-conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. stormconf-path)))) - blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) - file-names (get-blob-file-names blobstore-map)] - (if-not (empty? file-names) - (every? #(Utils/checkFileExists target-dir %) file-names) - true))) - -;; distributed implementation -(defmethod download-storm-code - :distributed [conf storm-id master-code-dir localizer] - ;; Downloading to permanent location is atomic - (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid)) - stormroot (supervisor-stormdist-root conf storm-id) - blobstore (Utils/getClientBlobStoreForSupervisor conf)] - (FileUtils/forceMkdir (File. tmproot)) - (if-not on-windows? - (Utils/restrictPermissions tmproot) - (if (conf SUPERVISOR-RUN-WORKER-AS-USER) - (throw-runtime (str "ERROR: Windows doesn't implement setting the correct permissions")))) - (Utils/downloadResourcesAsSupervisor (master-stormjar-key storm-id) - (supervisor-stormjar-path tmproot) blobstore) - (Utils/downloadResourcesAsSupervisor (master-stormcode-key storm-id) - (supervisor-stormcode-path tmproot) blobstore) - (Utils/downloadResourcesAsSupervisor (master-stormconf-key storm-id) - (supervisor-stormconf-path tmproot) blobstore) - (.shutdown blobstore) - (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot) - (download-blobs-for-topology! conf (supervisor-stormconf-path tmproot) localizer - tmproot) - (if (download-blobs-for-topology-succeed? (supervisor-stormconf-path tmproot) tmproot) - (do - (log-message "Successfully downloaded blob resources for storm-id " storm-id) - (FileUtils/forceMkdir (File. stormroot)) - (Files/move (.toPath (File. tmproot)) (.toPath (File. stormroot)) - (doto (make-array StandardCopyOption 1) (aset 0 StandardCopyOption/ATOMIC_MOVE))) - (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)) - (do - (log-message "Failed to download blob resources for storm-id " storm-id) - (rmr tmproot))))) - -(defn write-log-metadata-to-yaml-file! [storm-id port data conf] - (let [file (get-log-metadata-file conf storm-id port)] - ;;run worker as user needs the directory to have special permissions - ;; or it is insecure - (when (not (.exists (.getParentFile file))) - (if (conf SUPERVISOR-RUN-WORKER-AS-USER) - (do (FileUtils/forceMkdir (.getParentFile file)) - (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) (.getCanonicalPath (.getParentFile file)))) - (.mkdirs (.getParentFile file)))) - (let [writer (java.io.FileWriter. file) - yaml (Yaml.)] - (try - (.dump yaml data writer) - (finally - (.close writer)))))) - -(defn write-log-metadata! [storm-conf user worker-id storm-id port conf] - (let [data {TOPOLOGY-SUBMITTER-USER user - "worker-id" worker-id - LOGS-GROUPS (sort (distinct (remove nil? - (concat - (storm-conf LOGS-GROUPS) - (storm-conf TOPOLOGY-GROUPS))))) - LOGS-USERS (sort (distinct (remove nil? - (concat - (storm-conf LOGS-USERS) - (storm-conf TOPOLOGY-USERS)))))}] - (write-log-metadata-to-yaml-file! storm-id port data conf))) - -(defn jlp [stormroot conf] - (let [resource-root (str stormroot File/separator RESOURCES-SUBDIR) - os (clojure.string/replace (System/getProperty "os.name") #"\s+" "_") - arch (System/getProperty "os.arch") - arch-resource-root (str resource-root File/separator os "-" arch)] - (str arch-resource-root File/pathSeparator resource-root File/pathSeparator (conf JAVA-LIBRARY-PATH)))) - -(defn substitute-childopts - "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap" - [value worker-id topology-id port mem-onheap] - (let [replacement-map {"%ID%" (str port) - "%WORKER-ID%" (str worker-id) - "%TOPOLOGY-ID%" (str topology-id) - "%WORKER-PORT%" (str port) - "%HEAP-MEM%" (str mem-onheap)} - sub-fn #(reduce (fn [string entry] - (apply clojure.string/replace string entry)) - % - replacement-map)] - (cond - (nil? value) nil - (sequential? value) (vec (map sub-fn value)) - :else (-> value sub-fn (clojure.string/split #"\s+"))))) - - -(defn create-blobstore-links - "Create symlinks in worker launch directory for all blobs" - [conf storm-id worker-id] - (let [stormroot (supervisor-stormdist-root conf storm-id) - storm-conf (read-supervisor-storm-conf conf storm-id) - workerroot (worker-root conf worker-id) - blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) - blob-file-names (get-blob-file-names blobstore-map) - resource-file-names (cons RESOURCES-SUBDIR blob-file-names)] - (log-message "Creating symlinks for worker-id: " worker-id " storm-id: " - storm-id " for files(" (count resource-file-names) "): " (pr-str resource-file-names)) - (create-symlink! workerroot stormroot RESOURCES-SUBDIR) - (doseq [file-name blob-file-names] - (create-symlink! workerroot stormroot file-name file-name)))) - -(defn create-artifacts-link - "Create a symlink from workder directory to its port artifacts directory" - [conf storm-id port worker-id] - (let [worker-dir (worker-root conf worker-id) - topo-dir (worker-artifacts-root conf storm-id)] - (log-message "Creating symlinks for worker-id: " worker-id " storm-id: " - storm-id " to its port artifacts directory") - (if (.exists (File. worker-dir)) - (create-symlink! worker-dir topo-dir "artifacts" port)))) - -(defmethod launch-worker - :distributed [supervisor storm-id port worker-id mem-onheap] - (let [conf (:conf supervisor) - run-worker-as-user (conf SUPERVISOR-RUN-WORKER-AS-USER) - storm-home (System/getProperty "storm.home") - storm-options (System/getProperty "storm.options") - storm-conf-file (System/getProperty "storm.conf.file") - storm-log-dir LOG-DIR - storm-log-conf-dir (conf STORM-LOG4J2-CONF-DIR) - storm-log4j2-conf-dir (if storm-log-conf-dir - (if (is-absolute-path? storm-log-conf-dir) - storm-log-conf-dir - (str storm-home file-path-separator storm-log-conf-dir)) - (str storm-home file-path-separator "log4j2")) - stormroot (supervisor-stormdist-root conf storm-id) - jlp (jlp stormroot conf) - stormjar (supervisor-stormjar-path stormroot) - storm-conf (read-supervisor-storm-conf conf storm-id) - topo-classpath (if-let [cp (storm-conf TOPOLOGY-CLASSPATH)] - [cp] - []) - classpath (-> (worker-classpath) - (add-to-classpath [stormjar]) - (add-to-classpath topo-classpath)) - top-gc-opts (storm-conf TOPOLOGY-WORKER-GC-CHILDOPTS) - mem-onheap (if (and mem-onheap (> mem-onheap 0)) ;; not nil and not zero - (int (Math/ceil mem-onheap)) ;; round up - (storm-conf WORKER-HEAP-MEMORY-MB)) ;; otherwise use default value - gc-opts (substitute-childopts (if top-gc-opts top-gc-opts (conf WORKER-GC-CHILDOPTS)) worker-id storm-id port mem-onheap) - topo-worker-logwriter-childopts (storm-conf TOPOLOGY-WORKER-LOGWRITER-CHILDOPTS) - user (storm-conf TOPOLOGY-SUBMITTER-USER) - logfilename "worker.log" - workers-artifacts (worker-artifacts-root conf) - logging-sensitivity (storm-conf TOPOLOGY-LOGGING-SENSITIVITY "S3") - worker-childopts (when-let [s (conf WORKER-CHILDOPTS)] - (substitute-childopts s worker-id storm-id port mem-onheap)) - topo-worker-childopts (when-let [s (storm-conf TOPOLOGY-WORKER-CHILDOPTS)] - (substitute-childopts s worker-id storm-id port mem-onheap)) - worker--profiler-childopts (if (conf WORKER-PROFILER-ENABLED) - (substitute-childopts (conf WORKER-PROFILER-CHILDOPTS) worker-id storm-id port mem-onheap) - "") - topology-worker-environment (if-let [env (storm-conf TOPOLOGY-ENVIRONMENT)] - (merge env {"LD_LIBRARY_PATH" jlp}) - {"LD_LIBRARY_PATH" jlp}) - command (concat - [(java-cmd) "-cp" classpath - topo-worker-logwriter-childopts - (str "-Dlogfile.name=" logfilename) - (str "-Dstorm.home=" storm-home) - (str "-Dworkers.artifacts=" workers-artifacts) - (str "-Dstorm.id=" storm-id) - (str "-Dworker.id=" worker-id) - (str "-Dworker.port=" port) - (str "-Dstorm.log.dir=" storm-log-dir) - (str "-Dlog4j.configurationFile=" storm-log4j2-conf-dir file-path-separator "worker.xml") - (str "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector") - "backtype.storm.LogWriter"] - [(java-cmd) "-server"] - worker-childopts - topo-worker-childopts - gc-opts - worker--profiler-childopts - [(str "-Djava.library.path=" jlp) - (str "-Dlogfile.name=" logfilename) - (str "-Dstorm.home=" storm-home) - (str "-Dworkers.artifacts=" workers-artifacts) - (str "-Dstorm.conf.file=" storm-conf-file) - (str "-Dstorm.options=" storm-options) - (str "-Dstorm.log.dir=" storm-log-dir) - (str "-Dlogging.sensitivity=" logging-sensitivity) - (str "-Dlog4j.configurationFile=" storm-log4j2-conf-dir file-path-separator "worker.xml") - (str "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector") - (str "-Dstorm.id=" storm-id) - (str "-Dworker.id=" worker-id) - (str "-Dworker.port=" port) - "-cp" classpath - "backtype.storm.daemon.worker" - storm-id - (:assignment-id supervisor) - port - worker-id]) - command (->> command (map str) (filter (complement empty?)))] - (log-message "Launching worker with command: " (shell-cmd command)) - (write-log-metadata! storm-conf user worker-id storm-id port conf) - (set-worker-user! conf worker-id user) - (create-artifacts-link conf storm-id port worker-id) - (let [log-prefix (str "Worker Process " worker-id) - callback (fn [exit-code] - (log-message log-prefix " exited with code: " exit-code) - (add-dead-worker worker-id)) - worker-dir (worker-root conf worker-id)] - (remove-dead-worker worker-id) - (create-blobstore-links conf storm-id worker-id) - (if run-worker-as-user - (worker-launcher conf user ["worker" worker-dir (write-script worker-dir command :environment topology-worker-environment)] :log-prefix log-prefix :exit-code-callback callback :directory (File. worker-dir)) - (launch-process command :environment topology-worker-environment :log-prefix log-prefix :exit-code-callback callback :directory (File. worker-dir))) - ))) - -;; local implementation - -(defn resources-jar [] - (->> (.split (current-classpath) File/pathSeparator) - (filter #(.endsWith % ".jar")) - (filter #(zip-contains-dir? % RESOURCES-SUBDIR)) - first )) - -(defmethod download-storm-code - :local [conf storm-id master-code-dir localizer] - (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid)) - stormroot (supervisor-stormdist-root conf storm-id) - blob-store (Utils/getNimbusBlobStore conf master-code-dir nil)] - (try - (FileUtils/forceMkdir (File. tmproot)) - (.readBlobTo blob-store (master-stormcode-key storm-id) (FileOutputStream. (supervisor-stormcode-path tmproot)) nil) - (.readBlobTo blob-store (master-stormconf-key storm-id) (FileOutputStream. (supervisor-stormconf-path tmproot)) nil) - (finally - (.shutdown blob-store))) - (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) - (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot) - (let [classloader (.getContextClassLoader (Thread/currentThread)) - resources-jar (resources-jar) - url (.getResource classloader RESOURCES-SUBDIR) - target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)] - (cond - resources-jar - (do - (log-message "Extracting resources from jar at " resources-jar " to " target-dir) - (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot)) - url - (do - (log-message "Copying resources at " (str url) " to " target-dir) - (FileUtils/copyDirectory (File. (.getFile url)) (File. target-dir))))))) - -(defmethod launch-worker - :local [supervisor storm-id port worker-id mem-onheap] - (let [conf (:conf supervisor) - pid (uuid) - worker (worker/mk-worker conf - (:shared-context supervisor) - storm-id - (:assignment-id supervisor) - port - worker-id)] - (set-worker-user! conf worker-id "") - (psim/register-process pid worker) - (swap! (:worker-thread-pids-atom supervisor) assoc worker-id pid) - )) - -(defn -launch - [supervisor] - (log-message "Starting supervisor for storm version '" STORM-VERSION "'") - (let [conf (read-storm-config)] - (validate-distributed-mode! conf) - (let [supervisor (mk-supervisor conf nil supervisor)] - (add-shutdown-hook-with-force-kill-in-1-sec #(.shutdown supervisor))) - (defgauge supervisor:num-slots-used-gauge #(count (my-worker-ids conf))) - (start-metrics-reporters))) - -(defn standalone-supervisor [] - (let [conf-atom (atom nil) - id-atom (atom nil)] - (reify ISupervisor - (prepare [this conf local-dir] - (reset! conf-atom conf) - (let [state (LocalState. local-dir) - curr-id (if-let [id (ls-supervisor-id state)] - id - (generate-supervisor-id))] - (ls-supervisor-id! state curr-id) - (reset! id-atom curr-id)) - ) - (confirmAssigned [this port] - true) - (getMetadata [this] - (doall (map int (get @conf-atom SUPERVISOR-SLOTS-PORTS)))) - (getSupervisorId [this] - @id-atom) - (getAssignmentId [this] - @id-atom) - (killedWorker [this port] - ) - (assigned [this ports] - )))) - -(defn -main [] - (setup-default-uncaught-exception-handler) - (-launch (standalone-supervisor)))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/daemon/task.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/task.clj b/storm-core/src/clj/backtype/storm/daemon/task.clj deleted file mode 100644 index 7133fdf..0000000 --- a/storm-core/src/clj/backtype/storm/daemon/task.clj +++ /dev/null @@ -1,189 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.daemon.task - (:use [backtype.storm.daemon common]) - (:use [backtype.storm config util log]) - (:import [backtype.storm.hooks ITaskHook]) - (:import [backtype.storm.tuple Tuple TupleImpl]) - (:import [backtype.storm.grouping LoadMapping]) - (:import [backtype.storm.generated SpoutSpec Bolt StateSpoutSpec StormTopology]) - (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo - EmitInfo BoltFailInfo BoltAckInfo]) - (:import [backtype.storm.task TopologyContext ShellBolt WorkerTopologyContext]) - (:import [backtype.storm.utils Utils]) - (:import [backtype.storm.generated ShellComponent JavaObject]) - (:import [backtype.storm.spout ShellSpout]) - (:import [java.util Collection List ArrayList]) - (:require [backtype.storm - [thrift :as thrift] - [stats :as stats]]) - (:require [backtype.storm.daemon.builtin-metrics :as builtin-metrics])) - -(defn mk-topology-context-builder [worker executor-data topology] - (let [conf (:conf worker)] - #(TopologyContext. - topology - (:storm-conf worker) - (:task->component worker) - (:component->sorted-tasks worker) - (:component->stream->fields worker) - (:storm-id worker) - (supervisor-storm-resources-path - (supervisor-stormdist-root conf (:storm-id worker))) - (worker-pids-root conf (:worker-id worker)) - (int %) - (:port worker) - (:task-ids worker) - (:default-shared-resources worker) - (:user-shared-resources worker) - (:shared-executor-data executor-data) - (:interval->task->metric-registry executor-data) - (:open-or-prepare-was-called? executor-data)))) - -(defn system-topology-context [worker executor-data tid] - ((mk-topology-context-builder - worker - executor-data - (:system-topology worker)) - tid)) - -(defn user-topology-context [worker executor-data tid] - ((mk-topology-context-builder - worker - executor-data - (:topology worker)) - tid)) - -(defn- get-task-object [^StormTopology topology component-id] - (let [spouts (.get_spouts topology) - bolts (.get_bolts topology) - state-spouts (.get_state_spouts topology) - obj (Utils/getSetComponentObject - (cond - (contains? spouts component-id) (.get_spout_object ^SpoutSpec (get spouts component-id)) - (contains? bolts component-id) (.get_bolt_object ^Bolt (get bolts component-id)) - (contains? state-spouts component-id) (.get_state_spout_object ^StateSpoutSpec (get state-spouts component-id)) - true (throw-runtime "Could not find " component-id " in " topology))) - obj (if (instance? ShellComponent obj) - (if (contains? spouts component-id) - (ShellSpout. obj) - (ShellBolt. obj)) - obj ) - obj (if (instance? JavaObject obj) - (thrift/instantiate-java-object obj) - obj )] - obj - )) - -(defn get-context-hooks [^TopologyContext context] - (.getHooks context)) - -(defn hooks-empty? [^Collection hooks] - (.isEmpty hooks)) - -(defmacro apply-hooks [topology-context method-sym info-form] - (let [hook-sym (with-meta (gensym "hook") {:tag 'backtype.storm.hooks.ITaskHook})] - `(let [hooks# (get-context-hooks ~topology-context)] - (when-not (hooks-empty? hooks#) - (let [info# ~info-form] - (fast-list-iter [~hook-sym hooks#] - (~method-sym ~hook-sym info#) - )))))) - - -;; TODO: this is all expensive... should be precomputed -(defn send-unanchored - [task-data stream values] - (let [^TopologyContext topology-context (:system-context task-data) - tasks-fn (:tasks-fn task-data) - transfer-fn (-> task-data :executor-data :transfer-fn) - out-tuple (TupleImpl. topology-context - values - (.getThisTaskId topology-context) - stream)] - (fast-list-iter [t (tasks-fn stream values)] - (transfer-fn t out-tuple)))) - -(defn mk-tasks-fn [task-data] - (let [task-id (:task-id task-data) - executor-data (:executor-data task-data) - ^LoadMapping load-mapping (:load-mapping (:worker executor-data)) - component-id (:component-id executor-data) - ^WorkerTopologyContext worker-context (:worker-context executor-data) - storm-conf (:storm-conf executor-data) - emit-sampler (mk-stats-sampler storm-conf) - stream->component->grouper (:stream->component->grouper executor-data) - user-context (:user-context task-data) - executor-stats (:stats executor-data) - debug? (= true (storm-conf TOPOLOGY-DEBUG))] - - (fn ([^Integer out-task-id ^String stream ^List values] - (when debug? - (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values)) - (let [target-component (.getComponentId worker-context out-task-id) - component->grouping (get stream->component->grouper stream) - grouping (get component->grouping target-component) - out-task-id (if grouping out-task-id)] - (when (and (not-nil? grouping) (not= :direct grouping)) - (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping"))) - (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id])) - (when (emit-sampler) - (stats/emitted-tuple! executor-stats stream) - (if out-task-id - (stats/transferred-tuples! executor-stats stream 1))) - (if out-task-id [out-task-id]) - )) - ([^String stream ^List values] - (when debug? - (log-message "Emitting: " component-id " " stream " " values)) - (let [out-tasks (ArrayList.)] - (fast-map-iter [[out-component grouper] (get stream->component->grouper stream)] - (when (= :direct grouper) - ;; TODO: this is wrong, need to check how the stream was declared - (throw (IllegalArgumentException. "Cannot do regular emit to direct stream"))) - (let [comp-tasks (grouper task-id values load-mapping)] - (if (or (sequential? comp-tasks) (instance? Collection comp-tasks)) - (.addAll out-tasks comp-tasks) - (.add out-tasks comp-tasks) - ))) - (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks)) - (when (emit-sampler) - (stats/emitted-tuple! executor-stats stream) - (stats/transferred-tuples! executor-stats stream (count out-tasks))) - out-tasks))) - )) - -(defn mk-task-data [executor-data task-id] - (recursive-map - :executor-data executor-data - :task-id task-id - :system-context (system-topology-context (:worker executor-data) executor-data task-id) - :user-context (user-topology-context (:worker executor-data) executor-data task-id) - :builtin-metrics (builtin-metrics/make-data (:type executor-data) (:stats executor-data)) - :tasks-fn (mk-tasks-fn <>) - :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data)))) - - -(defn mk-task [executor-data task-id] - (let [task-data (mk-task-data executor-data task-id) - storm-conf (:storm-conf executor-data)] - (doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)] - (.addTaskHook ^TopologyContext (:user-context task-data) (-> klass Class/forName .newInstance))) - ;; when this is called, the threads for the executor haven't been started yet, - ;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue - (send-unanchored task-data SYSTEM-STREAM-ID ["startup"]) - task-data - ))
