http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/cluster.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj b/storm-core/src/clj/org/apache/storm/cluster.clj new file mode 100644 index 0000000..5aef266 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/cluster.clj @@ -0,0 +1,691 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. + +(ns org.apache.storm.cluster + (:import [org.apache.zookeeper.data Stat ACL Id] + [org.apache.storm.generated SupervisorInfo Assignment StormBase ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary + LogConfig ProfileAction ProfileRequest NodeInfo] + [java.io Serializable]) + (:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms]) + (:import [org.apache.curator.framework CuratorFramework]) + (:import [org.apache.storm.utils Utils]) + (:import [org.apache.storm.cluster ClusterState ClusterStateContext ClusterStateListener ConnectionState]) + (:import [java.security MessageDigest]) + (:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider]) + (:import [org.apache.storm.nimbus NimbusInfo]) + (:use [org.apache.storm util log config converter]) + (:require [org.apache.storm [zookeeper :as zk]]) + (:require [org.apache.storm.daemon [common :as common]])) + +(defn mk-topo-only-acls + [topo-conf] + (let [payload (.get topo-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)] + (when (Utils/isZkAuthenticationConfiguredTopology topo-conf) + [(first ZooDefs$Ids/CREATOR_ALL_ACL) + (ACL. ZooDefs$Perms/READ (Id. "digest" (DigestAuthenticationProvider/generateDigest payload)))]))) + +(defnk mk-distributed-cluster-state + [conf :auth-conf nil :acls nil :context (ClusterStateContext.)] + (let [clazz (Class/forName (or (conf STORM-CLUSTER-STATE-STORE) + "org.apache.storm.cluster_state.zookeeper_state_factory")) + state-instance (.newInstance clazz)] + (log-debug "Creating cluster state: " (.toString clazz)) + (or (.mkState state-instance conf auth-conf acls context) + nil))) + +(defprotocol StormClusterState + (assignments [this callback]) + (assignment-info [this storm-id callback]) + (assignment-info-with-version [this storm-id callback]) + (assignment-version [this storm-id callback]) + ;returns key information under /storm/blobstore/key + (blobstore-info [this blob-key]) + ;returns list of nimbus summaries stored under /stormroot/nimbuses/<nimbus-ids> -> <data> + (nimbuses [this]) + ;adds the NimbusSummary to /stormroot/nimbuses/nimbus-id + (add-nimbus-host! [this nimbus-id nimbus-summary]) + + (active-storms [this]) + (storm-base [this storm-id callback]) + (get-worker-heartbeat [this storm-id node port]) + (get-worker-profile-requests [this storm-id nodeinfo thrift?]) + (get-topology-profile-requests [this storm-id thrift?]) + (set-worker-profile-request [this storm-id profile-request]) + (delete-topology-profile-requests [this storm-id profile-request]) + (executor-beats [this storm-id executor->node+port]) + (supervisors [this callback]) + (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist + (setup-heartbeats! [this storm-id]) + (teardown-heartbeats! [this storm-id]) + (teardown-topology-errors! [this storm-id]) + (heartbeat-storms [this]) + (error-topologies [this]) + (set-topology-log-config! [this storm-id log-config]) + (topology-log-config [this storm-id cb]) + (worker-heartbeat! [this storm-id node port info]) + (remove-worker-heartbeat! [this storm-id node port]) + (supervisor-heartbeat! [this supervisor-id info]) + (worker-backpressure! [this storm-id node port info]) + (topology-backpressure [this storm-id callback]) + (setup-backpressure! [this storm-id]) + (remove-worker-backpressure! [this storm-id node port]) + (activate-storm! [this storm-id storm-base]) + (update-storm! [this storm-id new-elems]) + (remove-storm-base! [this storm-id]) + (set-assignment! [this storm-id info]) + ;; sets up information related to key consisting of nimbus + ;; host:port and version info of the blob + (setup-blobstore! [this key nimbusInfo versionInfo]) + (active-keys [this]) + (blobstore [this callback]) + (remove-storm! [this storm-id]) + (remove-blobstore-key! [this blob-key]) + (remove-key-version! [this blob-key]) + (report-error [this storm-id component-id node port error]) + (errors [this storm-id component-id]) + (last-error [this storm-id component-id]) + (set-credentials! [this storm-id creds topo-conf]) + (credentials [this storm-id callback]) + (disconnect [this])) + +(def ASSIGNMENTS-ROOT "assignments") +(def CODE-ROOT "code") +(def STORMS-ROOT "storms") +(def SUPERVISORS-ROOT "supervisors") +(def WORKERBEATS-ROOT "workerbeats") +(def BACKPRESSURE-ROOT "backpressure") +(def ERRORS-ROOT "errors") +(def BLOBSTORE-ROOT "blobstore") +; Stores the latest update sequence for a blob +(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT "blobstoremaxkeysequencenumber") +(def NIMBUSES-ROOT "nimbuses") +(def CREDENTIALS-ROOT "credentials") +(def LOGCONFIG-ROOT "logconfigs") +(def PROFILERCONFIG-ROOT "profilerconfigs") + +(def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT)) +(def STORMS-SUBTREE (str "/" STORMS-ROOT)) +(def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT)) +(def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT)) +(def BACKPRESSURE-SUBTREE (str "/" BACKPRESSURE-ROOT)) +(def ERRORS-SUBTREE (str "/" ERRORS-ROOT)) +;; Blobstore subtree /storm/blobstore +(def BLOBSTORE-SUBTREE (str "/" BLOBSTORE-ROOT)) +(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE (str "/" BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT)) +(def NIMBUSES-SUBTREE (str "/" NIMBUSES-ROOT)) +(def CREDENTIALS-SUBTREE (str "/" CREDENTIALS-ROOT)) +(def LOGCONFIG-SUBTREE (str "/" LOGCONFIG-ROOT)) +(def PROFILERCONFIG-SUBTREE (str "/" PROFILERCONFIG-ROOT)) + +(defn supervisor-path + [id] + (str SUPERVISORS-SUBTREE "/" id)) + +(defn assignment-path + [id] + (str ASSIGNMENTS-SUBTREE "/" id)) + +(defn blobstore-path + [key] + (str BLOBSTORE-SUBTREE "/" key)) + +(defn blobstore-max-key-sequence-number-path + [key] + (str BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE "/" key)) + +(defn nimbus-path + [id] + (str NIMBUSES-SUBTREE "/" id)) + +(defn storm-path + [id] + (str STORMS-SUBTREE "/" id)) + +(defn workerbeat-storm-root + [storm-id] + (str WORKERBEATS-SUBTREE "/" storm-id)) + +(defn workerbeat-path + [storm-id node port] + (str (workerbeat-storm-root storm-id) "/" node "-" port)) + +(defn backpressure-storm-root + [storm-id] + (str BACKPRESSURE-SUBTREE "/" storm-id)) + +(defn backpressure-path + [storm-id node port] + (str (backpressure-storm-root storm-id) "/" node "-" port)) + +(defn error-storm-root + [storm-id] + (str ERRORS-SUBTREE "/" storm-id)) + +(defn error-path + [storm-id component-id] + (str (error-storm-root storm-id) "/" (url-encode component-id))) + +(def last-error-path-seg "last-error") + +(defn last-error-path + [storm-id component-id] + (str (error-storm-root storm-id) + "/" + (url-encode component-id) + "-" + last-error-path-seg)) + +(defn credentials-path + [storm-id] + (str CREDENTIALS-SUBTREE "/" storm-id)) + +(defn log-config-path + [storm-id] + (str LOGCONFIG-SUBTREE "/" storm-id)) + +(defn profiler-config-path + ([storm-id] + (str PROFILERCONFIG-SUBTREE "/" storm-id)) + ([storm-id host port request-type] + (str (profiler-config-path storm-id) "/" host "_" port "_" request-type))) + +(defn- issue-callback! + [cb-atom] + (let [cb @cb-atom] + (reset! cb-atom nil) + (when cb + (cb)))) + +(defn- issue-map-callback! + [cb-atom id] + (let [cb (@cb-atom id)] + (swap! cb-atom dissoc id) + (when cb + (cb id)))) + +(defn- maybe-deserialize + [ser clazz] + (when ser + (Utils/deserialize ser clazz))) + +(defrecord TaskError [error time-secs host port]) + +(defn- parse-error-path + [^String p] + (Long/parseLong (.substring p 1))) + +(defn convert-executor-beats + "Ensures that we only return heartbeats for executors assigned to + this worker." + [executors worker-hb] + (let [executor-stats (:executor-stats worker-hb)] + (->> executors + (map (fn [t] + (if (contains? executor-stats t) + {t {:time-secs (:time-secs worker-hb) + :uptime (:uptime worker-hb) + :stats (get executor-stats t)}}))) + (into {})))) + +;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called. +(defnk mk-storm-cluster-state + [cluster-state-spec :acls nil :context (ClusterStateContext.)] + (let [[solo? cluster-state] (if (instance? ClusterState cluster-state-spec) + [false cluster-state-spec] + [true (mk-distributed-cluster-state cluster-state-spec :auth-conf cluster-state-spec :acls acls :context context)]) + assignment-info-callback (atom {}) + assignment-info-with-version-callback (atom {}) + assignment-version-callback (atom {}) + supervisors-callback (atom nil) + backpressure-callback (atom {}) ;; we want to reigister a topo directory getChildren callback for all workers of this dir + assignments-callback (atom nil) + storm-base-callback (atom {}) + blobstore-callback (atom nil) + credentials-callback (atom {}) + log-config-callback (atom {}) + state-id (.register + cluster-state + (fn [type path] + (let [[subtree & args] (tokenize-path path)] + (condp = subtree + ASSIGNMENTS-ROOT (if (empty? args) + (issue-callback! assignments-callback) + (do + (issue-map-callback! assignment-info-callback (first args)) + (issue-map-callback! assignment-version-callback (first args)) + (issue-map-callback! assignment-info-with-version-callback (first args)))) + SUPERVISORS-ROOT (issue-callback! supervisors-callback) + BLOBSTORE-ROOT (issue-callback! blobstore-callback) ;; callback register for blobstore + STORMS-ROOT (issue-map-callback! storm-base-callback (first args)) + CREDENTIALS-ROOT (issue-map-callback! credentials-callback (first args)) + LOGCONFIG-ROOT (issue-map-callback! log-config-callback (first args)) + BACKPRESSURE-ROOT (issue-map-callback! backpressure-callback (first args)) + ;; this should never happen + (exit-process! 30 "Unknown callback for subtree " subtree args)))))] + (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE BLOBSTORE-SUBTREE NIMBUSES-SUBTREE + LOGCONFIG-SUBTREE]] + (.mkdirs cluster-state p acls)) + (reify + StormClusterState + + (assignments + [this callback] + (when callback + (reset! assignments-callback callback)) + (.get_children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback))) + + (assignment-info + [this storm-id callback] + (when callback + (swap! assignment-info-callback assoc storm-id callback)) + (clojurify-assignment (maybe-deserialize (.get_data cluster-state (assignment-path storm-id) (not-nil? callback)) Assignment))) + + (assignment-info-with-version + [this storm-id callback] + (when callback + (swap! assignment-info-with-version-callback assoc storm-id callback)) + (let [{data :data version :version} + (.get_data_with_version cluster-state (assignment-path storm-id) (not-nil? callback))] + {:data (clojurify-assignment (maybe-deserialize data Assignment)) + :version version})) + + (assignment-version + [this storm-id callback] + (when callback + (swap! assignment-version-callback assoc storm-id callback)) + (.get_version cluster-state (assignment-path storm-id) (not-nil? callback))) + + ;; blobstore state + (blobstore + [this callback] + (when callback + (reset! blobstore-callback callback)) + (.sync_path cluster-state BLOBSTORE-SUBTREE) + (.get_children cluster-state BLOBSTORE-SUBTREE (not-nil? callback))) + + (nimbuses + [this] + (map #(maybe-deserialize (.get_data cluster-state (nimbus-path %1) false) NimbusSummary) + (.get_children cluster-state NIMBUSES-SUBTREE false))) + + (add-nimbus-host! + [this nimbus-id nimbus-summary] + ;explicit delete for ephmeral node to ensure this session creates the entry. + (.delete_node cluster-state (nimbus-path nimbus-id)) + + (.add_listener cluster-state (reify ClusterStateListener + (^void stateChanged[this ^ConnectionState newState] + (log-message "Connection state listener invoked, zookeeper connection state has changed to " newState) + (if (.equals newState ConnectionState/RECONNECTED) + (do + (log-message "Connection state has changed to reconnected so setting nimbuses entry one more time") + (.set_ephemeral_node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls)))))) + + (.set_ephemeral_node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls)) + + (setup-blobstore! + [this key nimbusInfo versionInfo] + (let [path (str (blobstore-path key) "/" (.toHostPortString nimbusInfo) "-" versionInfo)] + (log-message "setup-path" path) + (.mkdirs cluster-state (blobstore-path key) acls) + ;we delete the node first to ensure the node gets created as part of this session only. + (.delete_node_blobstore cluster-state (str (blobstore-path key)) (.toHostPortString nimbusInfo)) + (.set_ephemeral_node cluster-state path nil acls))) + + (blobstore-info + [this blob-key] + (let [path (blobstore-path blob-key)] + (.sync_path cluster-state path) + (.get_children cluster-state path false))) + + (active-storms + [this] + (.get_children cluster-state STORMS-SUBTREE false)) + + (active-keys + [this] + (.get_children cluster-state BLOBSTORE-SUBTREE false)) + + (heartbeat-storms + [this] + (.get_worker_hb_children cluster-state WORKERBEATS-SUBTREE false)) + + (error-topologies + [this] + (.get_children cluster-state ERRORS-SUBTREE false)) + + (get-worker-heartbeat + [this storm-id node port] + (let [worker-hb (.get_worker_hb cluster-state (workerbeat-path storm-id node port) false)] + (if worker-hb + (-> worker-hb + (maybe-deserialize ClusterWorkerHeartbeat) + clojurify-zk-worker-hb)))) + + (executor-beats + [this storm-id executor->node+port] + ;; need to take executor->node+port in explicitly so that we don't run into a situation where a + ;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats + ;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, + ;; we avoid situations like that + (let [node+port->executors (reverse-map executor->node+port) + all-heartbeats (for [[[node port] executors] node+port->executors] + (->> (get-worker-heartbeat this storm-id node port) + (convert-executor-beats executors) + ))] + (apply merge all-heartbeats))) + + (supervisors + [this callback] + (when callback + (reset! supervisors-callback callback)) + (.get_children cluster-state SUPERVISORS-SUBTREE (not-nil? callback))) + + (supervisor-info + [this supervisor-id] + (clojurify-supervisor-info (maybe-deserialize (.get_data cluster-state (supervisor-path supervisor-id) false) SupervisorInfo))) + + (topology-log-config + [this storm-id cb] + (when cb + (swap! log-config-callback assoc storm-id cb)) + (maybe-deserialize (.get_data cluster-state (log-config-path storm-id) (not-nil? cb)) LogConfig)) + + (set-topology-log-config! + [this storm-id log-config] + (.set_data cluster-state (log-config-path storm-id) (Utils/serialize log-config) acls)) + + (set-worker-profile-request + [this storm-id profile-request] + (let [request-type (.get_action profile-request) + host (.get_node (.get_nodeInfo profile-request)) + port (first (.get_port (.get_nodeInfo profile-request)))] + (.set_data cluster-state + (profiler-config-path storm-id host port request-type) + (Utils/serialize profile-request) + acls))) + + (get-topology-profile-requests + [this storm-id thrift?] + (let [path (profiler-config-path storm-id) + requests (if (.node_exists cluster-state path false) + (dofor [c (.get_children cluster-state path false)] + (let [raw (.get_data cluster-state (str path "/" c) false) + request (maybe-deserialize raw ProfileRequest)] + (if thrift? + request + (clojurify-profile-request request)))))] + requests)) + + (delete-topology-profile-requests + [this storm-id profile-request] + (let [profile-request-inst (thriftify-profile-request profile-request) + action (:action profile-request) + host (:host profile-request) + port (:port profile-request)] + (.delete_node cluster-state + (profiler-config-path storm-id host port action)))) + + (get-worker-profile-requests + [this storm-id node-info thrift?] + (let [host (:host node-info) + port (:port node-info) + profile-requests (get-topology-profile-requests this storm-id thrift?)] + (if thrift? + (filter #(and (= host (.get_node (.get_nodeInfo %))) (= port (first (.get_port (.get_nodeInfo %))))) + profile-requests) + (filter #(and (= host (:host %)) (= port (:port %))) + profile-requests)))) + + (worker-heartbeat! + [this storm-id node port info] + (let [thrift-worker-hb (thriftify-zk-worker-hb info)] + (if thrift-worker-hb + (.set_worker_hb cluster-state (workerbeat-path storm-id node port) (Utils/serialize thrift-worker-hb) acls)))) + + (remove-worker-heartbeat! + [this storm-id node port] + (.delete_worker_hb cluster-state (workerbeat-path storm-id node port))) + + (setup-heartbeats! + [this storm-id] + (.mkdirs cluster-state (workerbeat-storm-root storm-id) acls)) + + (teardown-heartbeats! + [this storm-id] + (try-cause + (.delete_worker_hb cluster-state (workerbeat-storm-root storm-id)) + (catch KeeperException e + (log-warn-error e "Could not teardown heartbeats for " storm-id)))) + + (worker-backpressure! + [this storm-id node port on?] + "if znode exists and to be not on?, delete; if exists and on?, do nothing; + if not exists and to be on?, create; if not exists and not on?, do nothing" + (let [path (backpressure-path storm-id node port) + existed (.node_exists cluster-state path false)] + (if existed + (if (not on?) + (.delete_node cluster-state path)) ;; delete the znode since the worker is not congested + (if on? + (.set_ephemeral_node cluster-state path nil acls))))) ;; create the znode since worker is congested + + (topology-backpressure + [this storm-id callback] + "if the backpresure/storm-id dir is empty, this topology has throttle-on, otherwise not." + (when callback + (swap! backpressure-callback assoc storm-id callback)) + (let [path (backpressure-storm-root storm-id) + children (.get_children cluster-state path (not-nil? callback))] + (> (count children) 0))) + + (setup-backpressure! + [this storm-id] + (.mkdirs cluster-state (backpressure-storm-root storm-id) acls)) + + (remove-worker-backpressure! + [this storm-id node port] + (.delete_node cluster-state (backpressure-path storm-id node port))) + + (teardown-topology-errors! + [this storm-id] + (try-cause + (.delete_node cluster-state (error-storm-root storm-id)) + (catch KeeperException e + (log-warn-error e "Could not teardown errors for " storm-id)))) + + (supervisor-heartbeat! + [this supervisor-id info] + (let [thrift-supervisor-info (thriftify-supervisor-info info)] + (.set_ephemeral_node cluster-state (supervisor-path supervisor-id) (Utils/serialize thrift-supervisor-info) acls))) + + (activate-storm! + [this storm-id storm-base] + (let [thrift-storm-base (thriftify-storm-base storm-base)] + (.set_data cluster-state (storm-path storm-id) (Utils/serialize thrift-storm-base) acls))) + + (update-storm! + [this storm-id new-elems] + (let [base (storm-base this storm-id nil) + executors (:component->executors base) + component->debug (:component->debug base) + new-elems (update new-elems :component->executors (partial merge executors)) + new-elems (update new-elems :component->debug (partial merge-with merge component->debug))] + (.set_data cluster-state (storm-path storm-id) + (-> base + (merge new-elems) + thriftify-storm-base + Utils/serialize) + acls))) + + (storm-base + [this storm-id callback] + (when callback + (swap! storm-base-callback assoc storm-id callback)) + (clojurify-storm-base (maybe-deserialize (.get_data cluster-state (storm-path storm-id) (not-nil? callback)) StormBase))) + + (remove-storm-base! + [this storm-id] + (.delete_node cluster-state (storm-path storm-id))) + + (set-assignment! + [this storm-id info] + (let [thrift-assignment (thriftify-assignment info)] + (.set_data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) acls))) + + (remove-blobstore-key! + [this blob-key] + (log-debug "removing key" blob-key) + (.delete_node cluster-state (blobstore-path blob-key))) + + (remove-key-version! + [this blob-key] + (.delete_node cluster-state (blobstore-max-key-sequence-number-path blob-key))) + + (remove-storm! + [this storm-id] + (.delete_node cluster-state (assignment-path storm-id)) + (.delete_node cluster-state (credentials-path storm-id)) + (.delete_node cluster-state (log-config-path storm-id)) + (.delete_node cluster-state (profiler-config-path storm-id)) + (remove-storm-base! this storm-id)) + + (set-credentials! + [this storm-id creds topo-conf] + (let [topo-acls (mk-topo-only-acls topo-conf) + path (credentials-path storm-id) + thriftified-creds (thriftify-credentials creds)] + (.set_data cluster-state path (Utils/serialize thriftified-creds) topo-acls))) + + (credentials + [this storm-id callback] + (when callback + (swap! credentials-callback assoc storm-id callback)) + (clojurify-crdentials (maybe-deserialize (.get_data cluster-state (credentials-path storm-id) (not-nil? callback)) Credentials))) + + (report-error + [this storm-id component-id node port error] + (let [path (error-path storm-id component-id) + last-error-path (last-error-path storm-id component-id) + data (thriftify-error {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}) + _ (.mkdirs cluster-state path acls) + ser-data (Utils/serialize data) + _ (.mkdirs cluster-state path acls) + _ (.create_sequential cluster-state (str path "/e") ser-data acls) + _ (.set_data cluster-state last-error-path ser-data acls) + to-kill (->> (.get_children cluster-state path false) + (sort-by parse-error-path) + reverse + (drop 10))] + (doseq [k to-kill] + (.delete_node cluster-state (str path "/" k))))) + + (errors + [this storm-id component-id] + (let [path (error-path storm-id component-id) + errors (if (.node_exists cluster-state path false) + (dofor [c (.get_children cluster-state path false)] + (if-let [data (-> (.get_data cluster-state + (str path "/" c) + false) + (maybe-deserialize ErrorInfo) + clojurify-error)] + (map->TaskError data))) + ())] + (->> (filter not-nil? errors) + (sort-by (comp - :time-secs))))) + + (last-error + [this storm-id component-id] + (let [path (last-error-path storm-id component-id)] + (if (.node_exists cluster-state path false) + (if-let [data (-> (.get_data cluster-state path false) + (maybe-deserialize ErrorInfo) + clojurify-error)] + (map->TaskError data))))) + + (disconnect + [this] + (.unregister cluster-state state-id) + (when solo? + (.close cluster-state)))))) + +;; daemons have a single thread that will respond to events +;; start with initialize event +;; callbacks add events to the thread's queue + +;; keeps in memory cache of the state, only for what client subscribes to. Any subscription is automatically kept in sync, and when there are changes, client is notified. +;; master gives orders through state, and client records status in state (ephemerally) + +;; master tells nodes what workers to launch + +;; master writes this. supervisors and workers subscribe to this to understand complete topology. each storm is a map from nodes to workers to tasks to ports whenever topology changes everyone will be notified +;; master includes timestamp of each assignment so that appropriate time can be given to each worker to start up +;; /assignments/{storm id} + +;; which tasks they talk to, etc. (immutable until shutdown) +;; everyone reads this in full to understand structure +;; /tasks/{storm id}/{task id} ; just contains bolt id + +;; supervisors send heartbeats here, master doesn't subscribe but checks asynchronously +;; /supervisors/status/{ephemeral node ids} ;; node metadata such as port ranges are kept here + +;; tasks send heartbeats here, master doesn't subscribe, just checks asynchronously +;; /taskbeats/{storm id}/{ephemeral task id} + +;; contains data about whether it's started or not, tasks and workers subscribe to specific storm here to know when to shutdown +;; master manipulates +;; /storms/{storm id} + +;; Zookeeper flows: + +;; Master: +;; job submit: +;; 1. read which nodes are available +;; 2. set up the worker/{storm}/{task} stuff (static) +;; 3. set assignments +;; 4. start storm - necessary in case master goes down, when goes back up can remember to take down the storm (2 states: on or off) + +;; Monitoring (or by checking when nodes go down or heartbeats aren't received): +;; 1. read assignment +;; 2. see which tasks/nodes are up +;; 3. make new assignment to fix any problems +;; 4. if a storm exists but is not taken down fully, ensure that storm takedown is launched (step by step remove tasks and finally remove assignments) + +;; masters only possible watches is on ephemeral nodes and tasks, and maybe not even + +;; Supervisor: +;; 1. monitor /storms/* and assignments +;; 2. local state about which workers are local +;; 3. when storm is on, check that workers are running locally & start/kill if different than assignments +;; 4. when storm is off, monitor tasks for workers - when they all die or don't hearbeat, kill the process and cleanup + +;; Worker: +;; 1. On startup, start the tasks if the storm is on + +;; Task: +;; 1. monitor assignments, reroute when assignments change +;; 2. monitor storm (when storm turns off, error if assignments change) - take down tasks as master turns them off + +;; locally on supervisor: workers write pids locally on startup, supervisor deletes it on shutdown (associates pid with worker name) +;; supervisor periodically checks to make sure processes are alive +;; {rootdir}/workers/{storm id}/{worker id} ;; contains pid inside + +;; all tasks in a worker share the same cluster state +;; workers, supervisors, and tasks subscribes to storm to know when it's started or stopped +;; on stopped, master removes records in order (tasks need to subscribe to themselves to see if they disappear) +;; when a master removes a worker, the supervisor should kill it (and escalate to kill -9) +;; on shutdown, tasks subscribe to tasks that send data to them to wait for them to die. when node disappears, they can die
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj b/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj new file mode 100644 index 0000000..3104c52 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj @@ -0,0 +1,161 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. + +(ns org.apache.storm.cluster-state.zookeeper-state-factory + (:import [org.apache.curator.framework.state ConnectionStateListener]) + (:import [org.apache.zookeeper KeeperException$NoNodeException] + [org.apache.storm.cluster ClusterState DaemonType]) + (:use [org.apache.storm cluster config log util]) + (:require [org.apache.storm [zookeeper :as zk]]) + (:gen-class + :implements [org.apache.storm.cluster.ClusterStateFactory])) + +(defn -mkState [this conf auth-conf acls context] + (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf auth-conf)] + (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT) acls) + (.close zk)) + (let [callbacks (atom {}) + active (atom true) + zk-writer (zk/mk-client conf + (conf STORM-ZOOKEEPER-SERVERS) + (conf STORM-ZOOKEEPER-PORT) + :auth-conf auth-conf + :root (conf STORM-ZOOKEEPER-ROOT) + :watcher (fn [state type path] + (when @active + (when-not (= :connected state) + (log-warn "Received event " state ":" type ":" path " with disconnected Writer Zookeeper.")) + (when-not (= :none type) + (doseq [callback (vals @callbacks)] + (callback type path)))))) + is-nimbus? (= (.getDaemonType context) DaemonType/NIMBUS) + zk-reader (if is-nimbus? + (zk/mk-client conf + (conf STORM-ZOOKEEPER-SERVERS) + (conf STORM-ZOOKEEPER-PORT) + :auth-conf auth-conf + :root (conf STORM-ZOOKEEPER-ROOT) + :watcher (fn [state type path] + (when @active + (when-not (= :connected state) + (log-warn "Received event " state ":" type ":" path " with disconnected Reader Zookeeper.")) + (when-not (= :none type) + (doseq [callback (vals @callbacks)] + (callback type path)))))) + zk-writer)] + (reify + ClusterState + + (register + [this callback] + (let [id (uuid)] + (swap! callbacks assoc id callback) + id)) + + (unregister + [this id] + (swap! callbacks dissoc id)) + + (set-ephemeral-node + [this path data acls] + (zk/mkdirs zk-writer (parent-path path) acls) + (if (zk/exists zk-writer path false) + (try-cause + (zk/set-data zk-writer path data) ; should verify that it's ephemeral + (catch KeeperException$NoNodeException e + (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data") + (zk/create-node zk-writer path data :ephemeral acls))) + (zk/create-node zk-writer path data :ephemeral acls))) + + (create-sequential + [this path data acls] + (zk/create-node zk-writer path data :sequential acls)) + + (set-data + [this path data acls] + ;; note: this does not turn off any existing watches + (if (zk/exists zk-writer path false) + (zk/set-data zk-writer path data) + (do + (zk/mkdirs zk-writer (parent-path path) acls) + (zk/create-node zk-writer path data :persistent acls)))) + + (set-worker-hb + [this path data acls] + (.set_data this path data acls)) + + (delete-node + [this path] + (zk/delete-node zk-writer path)) + + (delete-worker-hb + [this path] + (.delete_node this path)) + + (get-data + [this path watch?] + (zk/get-data zk-reader path watch?)) + + (get-data-with-version + [this path watch?] + (zk/get-data-with-version zk-reader path watch?)) + + (get-version + [this path watch?] + (zk/get-version zk-reader path watch?)) + + (get-worker-hb + [this path watch?] + (.get_data this path watch?)) + + (get-children + [this path watch?] + (zk/get-children zk-reader path watch?)) + + (get-worker-hb-children + [this path watch?] + (.get_children this path watch?)) + + (mkdirs + [this path acls] + (zk/mkdirs zk-writer path acls)) + + (node-exists + [this path watch?] + (zk/exists-node? zk-reader path watch?)) + + (add-listener + [this listener] + (let [curator-listener (reify ConnectionStateListener + (stateChanged + [this client newState] + (.stateChanged listener client newState)))] + (zk/add-listener zk-reader curator-listener))) + + (sync-path + [this path] + (zk/sync-path zk-writer path)) + + (delete-node-blobstore + [this path nimbus-host-port-info] + (zk/delete-node-blobstore zk-writer path nimbus-host-port-info)) + + (close + [this] + (reset! active false) + (.close zk-writer) + (if is-nimbus? + (.close zk-reader)))))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/activate.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/activate.clj b/storm-core/src/clj/org/apache/storm/command/activate.clj new file mode 100644 index 0000000..dc452e8 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/command/activate.clj @@ -0,0 +1,24 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.command.activate + (:use [org.apache.storm thrift log]) + (:gen-class)) + +(defn -main [name] + (with-configured-nimbus-connection nimbus + (.activate nimbus name) + (log-message "Activated topology: " name) + )) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/blobstore.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/blobstore.clj b/storm-core/src/clj/org/apache/storm/command/blobstore.clj new file mode 100644 index 0000000..b1496db --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/command/blobstore.clj @@ -0,0 +1,162 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.command.blobstore + (:import [java.io InputStream OutputStream] + [org.apache.storm.generated SettableBlobMeta AccessControl AuthorizationException + KeyNotFoundException] + [org.apache.storm.blobstore BlobStoreAclHandler]) + (:use [org.apache.storm config] + [clojure.string :only [split]] + [clojure.tools.cli :only [cli]] + [clojure.java.io :only [copy input-stream output-stream]] + [org.apache.storm blobstore log util]) + (:gen-class)) + +(defn update-blob-from-stream + "Update a blob in the blob store from an InputStream" + [key ^InputStream in] + (with-configured-blob-client blobstore + (let [out (.updateBlob blobstore key)] + (try + (copy in out) + (.close out) + (catch Exception e + (log-message e) + (.cancel out) + (throw e)))))) + +(defn create-blob-from-stream + "Create a blob in the blob store from an InputStream" + [key ^InputStream in ^SettableBlobMeta meta] + (with-configured-blob-client blobstore + (let [out (.createBlob blobstore key meta)] + (try + (copy in out) + (.close out) + (catch Exception e + (.cancel out) + (throw e)))))) + +(defn read-blob + "Read a blob in the blob store and write to an OutputStream" + [key ^OutputStream out] + (with-configured-blob-client blobstore + (with-open [in (.getBlob blobstore key)] + (copy in out)))) + +(defn as-access-control + "Convert a parameter to an AccessControl object" + [param] + (BlobStoreAclHandler/parseAccessControl (str param))) + +(defn as-acl + [param] + (map as-access-control (split param #","))) + +(defn access-control-str + [^AccessControl acl] + (BlobStoreAclHandler/accessControlToString acl)) + +(defn read-cli [args] + (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])] + (if file + (with-open [f (output-stream file)] + (read-blob key f)) + (read-blob key System/out)))) + +(defn update-cli [args] + (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])] + (if file + (with-open [f (input-stream file)] + (update-blob-from-stream key f)) + (update-blob-from-stream key System/in)) + (log-message "Successfully updated " key))) + +(defn create-cli [args] + (let [[{file :file acl :acl replication-factor :replication-factor} [key] _] (cli args ["-f" "--file" :default nil] + ["-a" "--acl" :default [] :parse-fn as-acl] + ["-r" "--replication-factor" :default -1 :parse-fn parse-int]) + meta (doto (SettableBlobMeta. acl) + (.set_replication_factor replication-factor))] + (validate-key-name! key) + (log-message "Creating " key " with ACL " (pr-str (map access-control-str acl))) + (if file + (with-open [f (input-stream file)] + (create-blob-from-stream key f meta)) + (create-blob-from-stream key System/in meta)) + (log-message "Successfully created " key))) + +(defn delete-cli [args] + (with-configured-blob-client blobstore + (doseq [key args] + (.deleteBlob blobstore key) + (log-message "deleted " key)))) + +(defn list-cli [args] + (with-configured-blob-client blobstore + (let [keys (if (empty? args) (iterator-seq (.listKeys blobstore)) args)] + (doseq [key keys] + (try + (let [meta (.getBlobMeta blobstore key) + version (.get_version meta) + acl (.get_acl (.get_settable meta))] + (log-message key " " version " " (pr-str (map access-control-str acl)))) + (catch AuthorizationException ae + (if-not (empty? args) (log-error "ACCESS DENIED to key: " key))) + (catch KeyNotFoundException knf + (if-not (empty? args) (log-error key " NOT FOUND")))))))) + +(defn set-acl-cli [args] + (let [[{set-acl :set} [key] _] + (cli args ["-s" "--set" :default [] :parse-fn as-acl])] + (with-configured-blob-client blobstore + (let [meta (.getBlobMeta blobstore key) + acl (.get_acl (.get_settable meta)) + new-acl (if set-acl set-acl acl) + new-meta (SettableBlobMeta. new-acl)] + (log-message "Setting ACL for " key " to " (pr-str (map access-control-str new-acl))) + (.setBlobMeta blobstore key new-meta))))) + +(defn rep-cli [args] + (let [sub-command (first args) + new-args (rest args)] + (with-configured-blob-client blobstore + (condp = sub-command + "--read" (let [key (first new-args) + blob-replication (.getBlobReplication blobstore key)] + (log-message "Current replication factor " blob-replication) + blob-replication) + "--update" (let [[{replication-factor :replication-factor} [key] _] + (cli new-args ["-r" "--replication-factor" :parse-fn parse-int])] + (if (nil? replication-factor) + (throw (RuntimeException. (str "Please set the replication factor"))) + (let [blob-replication (.updateBlobReplication blobstore key replication-factor)] + (log-message "Replication factor is set to " blob-replication) + blob-replication))) + :else (throw (RuntimeException. (str sub-command " is not a supported blobstore command"))))))) + +(defn -main [& args] + (let [command (first args) + new-args (rest args)] + (condp = command + "cat" (read-cli new-args) + "create" (create-cli new-args) + "update" (update-cli new-args) + "delete" (delete-cli new-args) + "list" (list-cli new-args) + "set-acl" (set-acl-cli new-args) + "replication" (rep-cli new-args) + :else (throw (RuntimeException. (str command " is not a supported blobstore command")))))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/config_value.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/config_value.clj b/storm-core/src/clj/org/apache/storm/command/config_value.clj new file mode 100644 index 0000000..9bc3e92 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/command/config_value.clj @@ -0,0 +1,24 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.command.config-value + (:use [org.apache.storm config log]) + (:gen-class)) + + +(defn -main [^String name] + (let [conf (read-storm-config)] + (println "VALUE:" (conf name)) + )) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/deactivate.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/deactivate.clj b/storm-core/src/clj/org/apache/storm/command/deactivate.clj new file mode 100644 index 0000000..4fd2c85 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/command/deactivate.clj @@ -0,0 +1,24 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.command.deactivate + (:use [org.apache.storm thrift log]) + (:gen-class)) + +(defn -main [name] + (with-configured-nimbus-connection nimbus + (.deactivate nimbus name) + (log-message "Deactivated topology: " name) + )) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj b/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj new file mode 100644 index 0000000..96de02d --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj @@ -0,0 +1,26 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.command.dev-zookeeper + (:use [org.apache.storm zookeeper util config]) + (:gen-class)) + +(defn -main [& args] + (let [conf (read-storm-config) + port (conf STORM-ZOOKEEPER-PORT) + localpath (conf DEV-ZOOKEEPER-PATH)] + (rmr localpath) + (mk-inprocess-zookeeper localpath :port port) + )) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/get_errors.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/get_errors.clj b/storm-core/src/clj/org/apache/storm/command/get_errors.clj new file mode 100644 index 0000000..c267390 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/command/get_errors.clj @@ -0,0 +1,52 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.command.get-errors + (:use [clojure.tools.cli :only [cli]]) + (:use [org.apache.storm thrift log]) + (:use [org.apache.storm util]) + (:require [org.apache.storm.daemon + [nimbus :as nimbus] + [common :as common]]) + (:import [org.apache.storm.generated GetInfoOptions NumErrorsChoice + TopologySummary ErrorInfo]) + (:gen-class)) + +(defn get-topology-id [name topologies] + (let [topology (first (filter #(= (.get_name %1) name) topologies))] + (when (not-nil? topology) (.get_id topology)))) + +(defn get-component-errors + [topology-errors] + (apply hash-map (remove nil? + (flatten (for [[comp-name comp-errors] topology-errors] + (let [latest-error (when (not (empty? comp-errors)) (first comp-errors))] + (if latest-error [comp-name (.get_error ^ErrorInfo latest-error)]))))))) + +(defn -main [name] + (with-configured-nimbus-connection nimbus + (let [opts (doto (GetInfoOptions.) + (.set_num_err_choice NumErrorsChoice/ONE)) + cluster-info (.getClusterInfo nimbus) + topologies (.get_topologies cluster-info) + topo-id (get-topology-id name topologies) + topo-info (when (not-nil? topo-id) (.getTopologyInfoWithOpts nimbus topo-id opts))] + (if (or (nil? topo-id) (nil? topo-info)) + (println (to-json {"Failure" (str "No topologies running with name " name)})) + (let [topology-name (.get_name topo-info) + topology-errors (.get_errors topo-info)] + (println (to-json (hash-map + "Topology Name" topology-name + "Comp-Errors" (get-component-errors topology-errors))))))))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/healthcheck.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/healthcheck.clj b/storm-core/src/clj/org/apache/storm/command/healthcheck.clj new file mode 100644 index 0000000..d96d7b3 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/command/healthcheck.clj @@ -0,0 +1,88 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.command.healthcheck + (:require [org.apache.storm + [config :refer :all] + [log :refer :all]] + [clojure.java [io :as io]] + [clojure [string :refer [split]]]) + (:gen-class)) + +(defn interrupter + "Interrupt a given thread after ms milliseconds." + [thread ms] + (let [interrupter (Thread. + (fn [] + (try + (Thread/sleep ms) + (.interrupt thread) + (catch InterruptedException e))))] + (.start interrupter) + interrupter)) + +(defn check-output [lines] + (if (some #(.startsWith % "ERROR") lines) + :failed + :success)) + +(defn process-script [conf script] + (let [script-proc (. (Runtime/getRuntime) (exec script)) + curthread (Thread/currentThread) + interrupter-thread (interrupter curthread + (conf STORM-HEALTH-CHECK-TIMEOUT-MS))] + (try + (.waitFor script-proc) + (.interrupt interrupter-thread) + (if (not (= (.exitValue script-proc) 0)) + :failed_with_exit_code + (check-output (split + (slurp (.getInputStream script-proc)) + #"\n+"))) + (catch InterruptedException e + (println "Script" script "timed out.") + :timeout) + (catch Exception e + (println "Script failed with exception: " e) + :failed_with_exception) + (finally (.interrupt interrupter-thread))))) + +(defn health-check [conf] + (let [health-dir (absolute-healthcheck-dir conf) + health-files (file-seq (io/file health-dir)) + health-scripts (filter #(and (.canExecute %) + (not (.isDirectory %))) + health-files) + results (->> health-scripts + (map #(.getAbsolutePath %)) + (map (partial process-script conf)))] + (log-message + (pr-str (map #'vector + (map #(.getAbsolutePath %) health-scripts) + results))) + ; failed_with_exit_code is OK. We're mimicing Hadoop's health checks. + ; We treat non-zero exit codes as indicators that the scripts failed + ; to execute properly, not that the system is unhealthy, in which case + ; we don't want to start killing things. + (if (every? #(or (= % :failed_with_exit_code) + (= % :success)) + results) + 0 + 1))) + +(defn -main [& args] + (let [conf (read-storm-config)] + (System/exit + (health-check conf)))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/heartbeats.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/heartbeats.clj b/storm-core/src/clj/org/apache/storm/command/heartbeats.clj new file mode 100644 index 0000000..ff28cba --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/command/heartbeats.clj @@ -0,0 +1,52 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.command.heartbeats + (:require [org.apache.storm + [config :refer :all] + [log :refer :all] + [cluster :refer :all] + [converter :refer :all]] + [clojure.string :refer :all]) + (:import [org.apache.storm.generated ClusterWorkerHeartbeat] + [org.apache.storm.utils Utils]) + (:gen-class)) + +(defn -main [command path & args] + (let [conf (read-storm-config) + cluster (mk-distributed-cluster-state conf :auth-conf conf)] + (println "Command: [" command "]") + (condp = command + "list" + (let [message (join " \n" (.get_worker_hb_children cluster path false))] + (log-message "list " path ":\n" + message "\n")) + "get" + (log-message + (if-let [hb (.get_worker_hb cluster path false)] + (clojurify-zk-worker-hb + (Utils/deserialize + hb + ClusterWorkerHeartbeat)) + "Nothing")) + + (log-message "Usage: heartbeats [list|get] path")) + + (try + (.close cluster) + (catch Exception e + (log-message "Caught exception: " e " on close.")))) + (System/exit 0)) + http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/kill_topology.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/kill_topology.clj b/storm-core/src/clj/org/apache/storm/command/kill_topology.clj new file mode 100644 index 0000000..84e0a64 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/command/kill_topology.clj @@ -0,0 +1,29 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.command.kill-topology + (:use [clojure.tools.cli :only [cli]]) + (:use [org.apache.storm thrift config log]) + (:import [org.apache.storm.generated KillOptions]) + (:gen-class)) + +(defn -main [& args] + (let [[{wait :wait} [name] _] (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)]) + opts (KillOptions.)] + (if wait (.set_wait_secs opts wait)) + (with-configured-nimbus-connection nimbus + (.killTopologyWithOpts nimbus name opts) + (log-message "Killed topology: " name) + ))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/kill_workers.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/kill_workers.clj b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj new file mode 100644 index 0000000..2670735 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj @@ -0,0 +1,33 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.command.kill-workers + (:import [java.io File]) + (:use [org.apache.storm.daemon common]) + (:use [org.apache.storm util config]) + (:require [org.apache.storm.daemon + [supervisor :as supervisor]]) + (:gen-class)) + +(defn -main + "Construct the supervisor-data from scratch and kill the workers on this supervisor" + [& args] + (let [conf (read-storm-config) + conf (assoc conf STORM-LOCAL-DIR (. (File. (conf STORM-LOCAL-DIR)) getCanonicalPath)) + isupervisor (supervisor/standalone-supervisor) + supervisor-data (supervisor/supervisor-data conf nil isupervisor) + ids (supervisor/my-worker-ids conf)] + (doseq [id ids] + (supervisor/shutdown-worker supervisor-data id)))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/list.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/list.clj b/storm-core/src/clj/org/apache/storm/command/list.clj new file mode 100644 index 0000000..87975cd --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/command/list.clj @@ -0,0 +1,38 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.command.list + (:use [org.apache.storm thrift log]) + (:import [org.apache.storm.generated TopologySummary]) + (:gen-class)) + +(defn -main [] + (with-configured-nimbus-connection nimbus + (let [cluster-info (.getClusterInfo nimbus) + topologies (.get_topologies cluster-info) + msg-format "%-20s %-10s %-10s %-12s %-10s"] + (if (or (nil? topologies) (empty? topologies)) + (println "No topologies running.") + (do + (println (format msg-format "Topology_name" "Status" "Num_tasks" "Num_workers" "Uptime_secs")) + (println "-------------------------------------------------------------------") + (doseq [^TopologySummary topology topologies] + (let [topology-name (.get_name topology) + topology-status (.get_status topology) + topology-num-tasks (.get_num_tasks topology) + topology-num-workers (.get_num_workers topology) + topology-uptime-secs (.get_uptime_secs topology)] + (println (format msg-format topology-name topology-status topology-num-tasks + topology-num-workers topology-uptime-secs))))))))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/monitor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/monitor.clj b/storm-core/src/clj/org/apache/storm/command/monitor.clj new file mode 100644 index 0000000..7fa9b2a --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/command/monitor.clj @@ -0,0 +1,37 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.command.monitor + (:use [clojure.tools.cli :only [cli]]) + (:use [org.apache.storm.thrift :only [with-configured-nimbus-connection]]) + (:import [org.apache.storm.utils Monitor]) + (:gen-class) + ) + +(defn -main [& args] + (let [[{interval :interval component :component stream :stream watch :watch} [name] _] + (cli args ["-i" "--interval" :default 4 :parse-fn #(Integer/parseInt %)] + ["-m" "--component" :default nil] + ["-s" "--stream" :default "default"] + ["-w" "--watch" :default "emitted"]) + mon (Monitor.)] + (if interval (.set_interval mon interval)) + (if name (.set_topology mon name)) + (if component (.set_component mon component)) + (if stream (.set_stream mon stream)) + (if watch (.set_watch mon watch)) + (with-configured-nimbus-connection nimbus + (.metrics mon nimbus) + ))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/rebalance.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/rebalance.clj b/storm-core/src/clj/org/apache/storm/command/rebalance.clj new file mode 100644 index 0000000..3868091 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/command/rebalance.clj @@ -0,0 +1,46 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.command.rebalance + (:use [clojure.tools.cli :only [cli]]) + (:use [org.apache.storm thrift config log]) + (:import [org.apache.storm.generated RebalanceOptions]) + (:gen-class)) + +(defn- parse-executor [^String s] + (let [eq-pos (.lastIndexOf s "=") + name (.substring s 0 eq-pos) + amt (.substring s (inc eq-pos))] + {name (Integer/parseInt amt)} + )) + +(defn -main [& args] + (let [[{wait :wait executor :executor num-workers :num-workers} [name] _] + (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)] + ["-n" "--num-workers" :default nil :parse-fn #(Integer/parseInt %)] + ["-e" "--executor" :parse-fn parse-executor + :assoc-fn (fn [previous key val] + (assoc previous key + (if-let [oldval (get previous key)] + (merge oldval val) + val)))]) + opts (RebalanceOptions.)] + (if wait (.set_wait_secs opts wait)) + (if executor (.set_num_executors opts executor)) + (if num-workers (.set_num_workers opts num-workers)) + (with-configured-nimbus-connection nimbus + (.rebalance nimbus name opts) + (log-message "Topology " name " is rebalancing") + ))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/set_log_level.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/set_log_level.clj b/storm-core/src/clj/org/apache/storm/command/set_log_level.clj new file mode 100644 index 0000000..7e1c3c5 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/command/set_log_level.clj @@ -0,0 +1,75 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.command.set-log-level + (:use [clojure.tools.cli :only [cli]]) + (:use [org.apache.storm thrift log]) + (:import [org.apache.logging.log4j Level]) + (:import [org.apache.storm.generated LogConfig LogLevel LogLevelAction]) + (:gen-class)) + +(defn- get-storm-id + "Get topology id for a running topology from the topology name." + [nimbus name] + (let [info (.getClusterInfo nimbus) + topologies (.get_topologies info) + topology (first (filter (fn [topo] (= name (.get_name topo))) topologies))] + (if topology + (.get_id topology) + (throw (.IllegalArgumentException (str name " is not a running topology")))))) + +(defn- parse-named-log-levels [action] + "Parses [logger name]=[level string]:[optional timeout],[logger name2]... + + e.g. ROOT=DEBUG:30 + root logger, debug for 30 seconds + + org.apache.foo=WARN + org.apache.foo set to WARN indefinitely" + (fn [^String s] + (let [log-args (re-find #"(.*)=([A-Z]+):?(\d*)" s) + name (if (= action LogLevelAction/REMOVE) s (nth log-args 1)) + level (Level/toLevel (nth log-args 2)) + timeout-str (nth log-args 3) + log-level (LogLevel.)] + (if (= action LogLevelAction/REMOVE) + (.set_action log-level action) + (do + (.set_action log-level action) + (.set_target_log_level log-level (.toString level)) + (.set_reset_log_level_timeout_secs log-level + (Integer. (if (= timeout-str "") "0" timeout-str))))) + {name log-level}))) + +(defn- merge-together [previous key val] + (assoc previous key + (if-let [oldval (get previous key)] + (merge oldval val) + val))) + +(defn -main [& args] + (let [[{log-setting :log-setting remove-log-setting :remove-log-setting} [name] _] + (cli args ["-l" "--log-setting" + :parse-fn (parse-named-log-levels LogLevelAction/UPDATE) + :assoc-fn merge-together] + ["-r" "--remove-log-setting" + :parse-fn (parse-named-log-levels LogLevelAction/REMOVE) + :assoc-fn merge-together]) + log-config (LogConfig.)] + (doseq [[log-name log-val] (merge log-setting remove-log-setting)] + (.put_to_named_logger_level log-config log-name log-val)) + (log-message "Sent log config " log-config " for topology " name) + (with-configured-nimbus-connection nimbus + (.setLogConfig nimbus (get-storm-id nimbus name) log-config)))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/shell_submission.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj new file mode 100644 index 0000000..b09c4f7 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj @@ -0,0 +1,33 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.command.shell-submission + (:import [org.apache.storm StormSubmitter]) + (:use [org.apache.storm thrift util config log zookeeper]) + (:require [clojure.string :as str]) + (:gen-class)) + + +(defn -main [^String tmpjarpath & args] + (let [conf (read-storm-config) + zk-leader-elector (zk-leader-elector conf) + leader-nimbus (.getLeader zk-leader-elector) + host (.getHost leader-nimbus) + port (.getPort leader-nimbus) + no-op (.close zk-leader-elector) + jarpath (StormSubmitter/submitJar conf tmpjarpath) + args (concat args [host port jarpath])] + (exec-command! (str/join " " args)) + )) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/command/upload_credentials.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/upload_credentials.clj b/storm-core/src/clj/org/apache/storm/command/upload_credentials.clj new file mode 100644 index 0000000..f63bde4 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/command/upload_credentials.clj @@ -0,0 +1,35 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.command.upload-credentials + (:use [clojure.tools.cli :only [cli]]) + (:use [org.apache.storm log util]) + (:import [org.apache.storm StormSubmitter]) + (:import [java.util Properties]) + (:import [java.io FileReader]) + (:gen-class)) + +(defn read-map [file-name] + (let [props (Properties. ) + _ (.load props (FileReader. file-name))] + (clojurify-structure props))) + +(defn -main [& args] + (let [[{cred-file :file} [name & rawCreds]] (cli args ["-f" "--file" :default nil]) + _ (when (and rawCreds (not (even? (.size rawCreds)))) (throw (RuntimeException. "Need an even number of arguments to make a map"))) + mapping (if rawCreds (apply assoc {} rawCreds) {}) + file-mapping (if (nil? cred-file) {} (read-map cred-file))] + (StormSubmitter/pushCredentials name {} (merge file-mapping mapping)) + (log-message "Uploaded new creds to topology: " name)))
