http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj 
b/storm-core/src/clj/backtype/storm/cluster.clj
deleted file mode 100644
index 914fa5b..0000000
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ /dev/null
@@ -1,708 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.cluster
-  (:import [org.apache.zookeeper.data Stat ACL Id]
-           [org.apache.storm.generated SupervisorInfo Assignment StormBase 
ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary
-            LogConfig ProfileAction ProfileRequest NodeInfo]
-           [java.io Serializable])
-  (:import [org.apache.zookeeper KeeperException 
KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms])
-  (:import [org.apache.curator.framework CuratorFramework])
-  (:import [org.apache.storm.utils Utils])
-  (:import [org.apache.storm.cluster ClusterState ClusterStateContext 
ClusterStateListener ConnectionState])
-  (:import [java.security MessageDigest])
-  (:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider])
-  (:import [org.apache.storm.nimbus NimbusInfo])
-  (:use [org.apache.storm util log config converter])
-  (:require [org.apache.storm [zookeeper :as zk]])
-  (:require [org.apache.storm.daemon [common :as common]]))
-
-(defn mk-topo-only-acls
-  [topo-conf]
-  (let [payload (.get topo-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)]
-    (when (Utils/isZkAuthenticationConfiguredTopology topo-conf)
-      [(first ZooDefs$Ids/CREATOR_ALL_ACL)
-       (ACL. ZooDefs$Perms/READ (Id. "digest" 
(DigestAuthenticationProvider/generateDigest payload)))])))
- 
-(defnk mk-distributed-cluster-state
-  [conf :auth-conf nil :acls nil :context (ClusterStateContext.)]
-  (let [clazz (Class/forName (or (conf STORM-CLUSTER-STATE-STORE)
-                                 
"org.apache.storm.cluster_state.zookeeper_state_factory"))
-        state-instance (.newInstance clazz)]
-    (log-debug "Creating cluster state: " (.toString clazz))
-    (or (.mkState state-instance conf auth-conf acls context)
-        nil)))
-
-(defprotocol StormClusterState
-  (assignments [this callback])
-  (assignment-info [this storm-id callback])
-  (assignment-info-with-version [this storm-id callback])
-  (assignment-version [this storm-id callback])
-  ;returns key information under /storm/blobstore/key
-  (blobstore-info [this blob-key])
-  ;returns list of nimbus summaries stored under 
/stormroot/nimbuses/<nimbus-ids> -> <data>
-  (nimbuses [this])
-  ;adds the NimbusSummary to /stormroot/nimbuses/nimbus-id
-  (add-nimbus-host! [this nimbus-id nimbus-summary])
-
-  (active-storms [this])
-  (storm-base [this storm-id callback])
-  (get-worker-heartbeat [this storm-id node port])
-  (get-worker-profile-requests [this storm-id nodeinfo thrift?])
-  (get-topology-profile-requests [this storm-id thrift?])
-  (set-worker-profile-request [this storm-id profile-request])
-  (delete-topology-profile-requests [this storm-id profile-request])
-  (executor-beats [this storm-id executor->node+port])
-  (supervisors [this callback])
-  (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
-  (setup-heartbeats! [this storm-id])
-  (teardown-heartbeats! [this storm-id])
-  (teardown-topology-errors! [this storm-id])
-  (heartbeat-storms [this])
-  (error-topologies [this])
-  (backpressure-topologies [this])
-  (set-topology-log-config! [this storm-id log-config])
-  (topology-log-config [this storm-id cb])
-  (worker-heartbeat! [this storm-id node port info])
-  (remove-worker-heartbeat! [this storm-id node port])
-  (supervisor-heartbeat! [this supervisor-id info])
-  (worker-backpressure! [this storm-id node port info])
-  (topology-backpressure [this storm-id callback])
-  (setup-backpressure! [this storm-id])
-  (remove-backpressure! [this storm-id])
-  (remove-worker-backpressure! [this storm-id node port])
-  (activate-storm! [this storm-id storm-base])
-  (update-storm! [this storm-id new-elems])
-  (remove-storm-base! [this storm-id])
-  (set-assignment! [this storm-id info])
-  ;; sets up information related to key consisting of nimbus
-  ;; host:port and version info of the blob
-  (setup-blobstore! [this key nimbusInfo versionInfo])
-  (active-keys [this])
-  (blobstore [this callback])
-  (remove-storm! [this storm-id])
-  (remove-blobstore-key! [this blob-key])
-  (remove-key-version! [this blob-key])
-  (report-error [this storm-id component-id node port error])
-  (errors [this storm-id component-id])
-  (last-error [this storm-id component-id])
-  (set-credentials! [this storm-id creds topo-conf])
-  (credentials [this storm-id callback])
-  (disconnect [this]))
-
-(def ASSIGNMENTS-ROOT "assignments")
-(def CODE-ROOT "code")
-(def STORMS-ROOT "storms")
-(def SUPERVISORS-ROOT "supervisors")
-(def WORKERBEATS-ROOT "workerbeats")
-(def BACKPRESSURE-ROOT "backpressure")
-(def ERRORS-ROOT "errors")
-(def BLOBSTORE-ROOT "blobstore")
-; Stores the latest update sequence for a blob
-(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT "blobstoremaxkeysequencenumber")
-(def NIMBUSES-ROOT "nimbuses")
-(def CREDENTIALS-ROOT "credentials")
-(def LOGCONFIG-ROOT "logconfigs")
-(def PROFILERCONFIG-ROOT "profilerconfigs")
-
-(def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT))
-(def STORMS-SUBTREE (str "/" STORMS-ROOT))
-(def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT))
-(def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT))
-(def BACKPRESSURE-SUBTREE (str "/" BACKPRESSURE-ROOT))
-(def ERRORS-SUBTREE (str "/" ERRORS-ROOT))
-;; Blobstore subtree /storm/blobstore
-(def BLOBSTORE-SUBTREE (str "/" BLOBSTORE-ROOT))
-(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE (str "/" 
BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT))
-(def NIMBUSES-SUBTREE (str "/" NIMBUSES-ROOT))
-(def CREDENTIALS-SUBTREE (str "/" CREDENTIALS-ROOT))
-(def LOGCONFIG-SUBTREE (str "/" LOGCONFIG-ROOT))
-(def PROFILERCONFIG-SUBTREE (str "/" PROFILERCONFIG-ROOT))
-
-(defn supervisor-path
-  [id]
-  (str SUPERVISORS-SUBTREE "/" id))
-
-(defn assignment-path
-  [id]
-  (str ASSIGNMENTS-SUBTREE "/" id))
-
-(defn blobstore-path
-  [key]
-  (str BLOBSTORE-SUBTREE "/" key))
-
-(defn blobstore-max-key-sequence-number-path
-  [key]
-  (str BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE "/" key))
-
-(defn nimbus-path
-  [id]
-  (str NIMBUSES-SUBTREE "/" id))
-
-(defn storm-path
-  [id]
-  (str STORMS-SUBTREE "/" id))
-
-(defn workerbeat-storm-root
-  [storm-id]
-  (str WORKERBEATS-SUBTREE "/" storm-id))
-
-(defn workerbeat-path
-  [storm-id node port]
-  (str (workerbeat-storm-root storm-id) "/" node "-" port))
-
-(defn backpressure-storm-root
-  [storm-id]
-  (str BACKPRESSURE-SUBTREE "/" storm-id))
-
-(defn backpressure-path
-  [storm-id node port]
-  (str (backpressure-storm-root storm-id) "/" node "-" port))
-
-(defn error-storm-root
-  [storm-id]
-  (str ERRORS-SUBTREE "/" storm-id))
-
-(defn error-path
-  [storm-id component-id]
-  (str (error-storm-root storm-id) "/" (url-encode component-id)))
-
-(def last-error-path-seg "last-error")
-
-(defn last-error-path
-  [storm-id component-id]
-  (str (error-storm-root storm-id)
-       "/"
-       (url-encode component-id)
-       "-"
-       last-error-path-seg))
-
-(defn credentials-path
-  [storm-id]
-  (str CREDENTIALS-SUBTREE "/" storm-id))
-
-(defn log-config-path
-  [storm-id]
-  (str LOGCONFIG-SUBTREE "/" storm-id))
-
-(defn profiler-config-path
-  ([storm-id]
-   (str PROFILERCONFIG-SUBTREE "/" storm-id))
-  ([storm-id host port request-type]
-   (str (profiler-config-path storm-id) "/" host "_" port "_" request-type)))
-
-(defn- issue-callback!
-  [cb-atom]
-  (let [cb @cb-atom]
-    (reset! cb-atom nil)
-    (when cb
-      (cb))))
-
-(defn- issue-map-callback!
-  [cb-atom id]
-  (let [cb (@cb-atom id)]
-    (swap! cb-atom dissoc id)
-    (when cb
-      (cb id))))
-
-(defn- maybe-deserialize
-  [ser clazz]
-  (when ser
-    (Utils/deserialize ser clazz)))
-
-(defrecord TaskError [error time-secs host port])
-
-(defn- parse-error-path
-  [^String p]
-  (Long/parseLong (.substring p 1)))
-
-(defn convert-executor-beats
-  "Ensures that we only return heartbeats for executors assigned to
-  this worker."
-  [executors worker-hb]
-  (let [executor-stats (:executor-stats worker-hb)]
-    (->> executors
-         (map (fn [t]
-                (if (contains? executor-stats t)
-                  {t {:time-secs (:time-secs worker-hb)
-                      :uptime (:uptime worker-hb)
-                      :stats (get executor-stats t)}})))
-         (into {}))))
-
-;; Watches should be used for optimization. When ZK is reconnecting, they're 
not guaranteed to be called.
-(defnk mk-storm-cluster-state
-  [cluster-state-spec :acls nil :context (ClusterStateContext.)]
-  (let [[solo? cluster-state] (if (instance? ClusterState cluster-state-spec)
-                                [false cluster-state-spec]
-                                [true (mk-distributed-cluster-state 
cluster-state-spec :auth-conf cluster-state-spec :acls acls :context context)])
-        assignment-info-callback (atom {})
-        assignment-info-with-version-callback (atom {})
-        assignment-version-callback (atom {})
-        supervisors-callback (atom nil)
-        backpressure-callback (atom {})   ;; we want to reigister a topo 
directory getChildren callback for all workers of this dir
-        assignments-callback (atom nil)
-        storm-base-callback (atom {})
-        blobstore-callback (atom nil)
-        credentials-callback (atom {})
-        log-config-callback (atom {})
-        state-id (.register
-                  cluster-state
-                  (fn [type path]
-                    (let [[subtree & args] (tokenize-path path)]
-                      (condp = subtree
-                         ASSIGNMENTS-ROOT (if (empty? args)
-                                             (issue-callback! 
assignments-callback)
-                                             (do
-                                               (issue-map-callback! 
assignment-info-callback (first args))
-                                               (issue-map-callback! 
assignment-version-callback (first args))
-                                               (issue-map-callback! 
assignment-info-with-version-callback (first args))))
-                         SUPERVISORS-ROOT (issue-callback! 
supervisors-callback)
-                         BLOBSTORE-ROOT (issue-callback! blobstore-callback) 
;; callback register for blobstore
-                         STORMS-ROOT (issue-map-callback! storm-base-callback 
(first args))
-                         CREDENTIALS-ROOT (issue-map-callback! 
credentials-callback (first args))
-                         LOGCONFIG-ROOT (issue-map-callback! 
log-config-callback (first args))
-                         BACKPRESSURE-ROOT (issue-map-callback! 
backpressure-callback (first args))
-                         ;; this should never happen
-                         (exit-process! 30 "Unknown callback for subtree " 
subtree args)))))]
-    (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE 
WORKERBEATS-SUBTREE ERRORS-SUBTREE BLOBSTORE-SUBTREE NIMBUSES-SUBTREE
-               LOGCONFIG-SUBTREE BACKPRESSURE-SUBTREE]]
-      (.mkdirs cluster-state p acls))
-    (reify
-      StormClusterState
-
-      (assignments
-        [this callback]
-        (when callback
-          (reset! assignments-callback callback))
-        (.get_children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))
-
-      (assignment-info
-        [this storm-id callback]
-        (when callback
-          (swap! assignment-info-callback assoc storm-id callback))
-        (clojurify-assignment (maybe-deserialize (.get_data cluster-state 
(assignment-path storm-id) (not-nil? callback)) Assignment)))
-
-      (assignment-info-with-version 
-        [this storm-id callback]
-        (when callback
-          (swap! assignment-info-with-version-callback assoc storm-id 
callback))
-        (let [{data :data version :version} 
-              (.get_data_with_version cluster-state (assignment-path storm-id) 
(not-nil? callback))]
-        {:data (clojurify-assignment (maybe-deserialize data Assignment))
-         :version version}))
-
-      (assignment-version 
-        [this storm-id callback]
-        (when callback
-          (swap! assignment-version-callback assoc storm-id callback))
-        (.get_version cluster-state (assignment-path storm-id) (not-nil? 
callback)))
-
-      ;; blobstore state
-      (blobstore
-        [this callback]
-        (when callback
-          (reset! blobstore-callback callback))
-        (.sync_path cluster-state BLOBSTORE-SUBTREE)
-        (.get_children cluster-state BLOBSTORE-SUBTREE (not-nil? callback)))
-
-      (nimbuses
-        [this]
-        (map #(maybe-deserialize (.get_data cluster-state (nimbus-path %1) 
false) NimbusSummary)
-          (.get_children cluster-state NIMBUSES-SUBTREE false)))
-
-      (add-nimbus-host!
-        [this nimbus-id nimbus-summary]
-        ;explicit delete for ephmeral node to ensure this session creates the 
entry.
-        (.delete_node cluster-state (nimbus-path nimbus-id))
-
-        (.add_listener cluster-state (reify ClusterStateListener
-                        (^void stateChanged[this ^ConnectionState newState]
-                          (log-message "Connection state listener invoked, 
zookeeper connection state has changed to " newState)
-                          (if (.equals newState ConnectionState/RECONNECTED)
-                            (do
-                              (log-message "Connection state has changed to 
reconnected so setting nimbuses entry one more time")
-                              ;explicit delete for ephmeral node to ensure 
this session creates the entry.
-                              (.delete_node cluster-state (nimbus-path 
nimbus-id))
-                              (.set_ephemeral_node cluster-state (nimbus-path 
nimbus-id) (Utils/serialize nimbus-summary) acls))))))
-        
-        (.set_ephemeral_node cluster-state (nimbus-path nimbus-id) 
(Utils/serialize nimbus-summary) acls))
-
-      (setup-blobstore!
-        [this key nimbusInfo versionInfo]
-        (let [path (str (blobstore-path key) "/" (.toHostPortString 
nimbusInfo) "-" versionInfo)]
-          (log-message "setup-path" path)
-          (.mkdirs cluster-state (blobstore-path key) acls)
-          ;we delete the node first to ensure the node gets created as part of 
this session only.
-          (.delete_node_blobstore cluster-state (str (blobstore-path key)) 
(.toHostPortString nimbusInfo))
-          (.set_ephemeral_node cluster-state path nil acls)))
-
-      (blobstore-info
-        [this blob-key]
-        (let [path (blobstore-path blob-key)]
-          (.sync_path cluster-state path)
-          (.get_children cluster-state path false)))
-
-      (active-storms
-        [this]
-        (.get_children cluster-state STORMS-SUBTREE false))
-
-      (active-keys
-        [this]
-        (.get_children cluster-state BLOBSTORE-SUBTREE false))
-
-      (heartbeat-storms
-        [this]
-        (.get_worker_hb_children cluster-state WORKERBEATS-SUBTREE false))
-
-      (error-topologies
-        [this]
-        (.get_children cluster-state ERRORS-SUBTREE false))
-
-      (backpressure-topologies
-        [this]
-        (.get_children cluster-state BACKPRESSURE-SUBTREE false))
-
-      (get-worker-heartbeat
-        [this storm-id node port]
-        (let [worker-hb (.get_worker_hb cluster-state (workerbeat-path 
storm-id node port) false)]
-          (if worker-hb
-            (-> worker-hb
-              (maybe-deserialize ClusterWorkerHeartbeat)
-              clojurify-zk-worker-hb))))
-
-      (executor-beats
-        [this storm-id executor->node+port]
-        ;; need to take executor->node+port in explicitly so that we don't run 
into a situation where a
-        ;; long dead worker with a skewed clock overrides all the timestamps. 
By only checking heartbeats
-        ;; with an assigned node+port, and only reading executors from that 
heartbeat that are actually assigned,
-        ;; we avoid situations like that
-        (let [node+port->executors (reverse-map executor->node+port)
-              all-heartbeats (for [[[node port] executors] 
node+port->executors]
-                               (->> (get-worker-heartbeat this storm-id node 
port)
-                                    (convert-executor-beats executors)
-                                    ))]
-          (apply merge all-heartbeats)))
-
-      (supervisors
-        [this callback]
-        (when callback
-          (reset! supervisors-callback callback))
-        (.get_children cluster-state SUPERVISORS-SUBTREE (not-nil? callback)))
-
-      (supervisor-info
-        [this supervisor-id]
-        (clojurify-supervisor-info (maybe-deserialize (.get_data cluster-state 
(supervisor-path supervisor-id) false) SupervisorInfo)))
-
-      (topology-log-config
-        [this storm-id cb]
-        (when cb
-          (swap! log-config-callback assoc storm-id cb))
-        (maybe-deserialize (.get_data cluster-state (log-config-path storm-id) 
(not-nil? cb)) LogConfig))
-
-      (set-topology-log-config!
-        [this storm-id log-config]
-        (.set_data cluster-state (log-config-path storm-id) (Utils/serialize 
log-config) acls))
-
-      (set-worker-profile-request
-        [this storm-id profile-request]
-        (let [request-type (.get_action profile-request)
-              host (.get_node (.get_nodeInfo profile-request))
-              port (first (.get_port (.get_nodeInfo profile-request)))]
-          (.set_data cluster-state
-                     (profiler-config-path storm-id host port request-type)
-                     (Utils/serialize profile-request)
-                     acls)))
-
-      (get-topology-profile-requests
-        [this storm-id thrift?]
-        (let [path (profiler-config-path storm-id)
-              requests (if (.node_exists cluster-state path false)
-                         (dofor [c (.get_children cluster-state path false)]
-                                (let [raw (.get_data cluster-state (str path 
"/" c) false)
-                                      request (maybe-deserialize raw 
ProfileRequest)]
-                                      (if thrift?
-                                        request
-                                        (clojurify-profile-request 
request)))))]
-          requests))
-
-      (delete-topology-profile-requests
-        [this storm-id profile-request]
-        (let [profile-request-inst (thriftify-profile-request profile-request)
-              action (:action profile-request)
-              host (:host profile-request)
-              port (:port profile-request)]
-          (.delete_node cluster-state
-           (profiler-config-path storm-id host port action))))
-          
-      (get-worker-profile-requests
-        [this storm-id node-info thrift?]
-        (let [host (:host node-info)
-              port (:port node-info)
-              profile-requests (get-topology-profile-requests this storm-id 
thrift?)]
-          (if thrift?
-            (filter #(and (= host (.get_node (.get_nodeInfo %))) (= port 
(first (.get_port (.get_nodeInfo  %)))))
-                    profile-requests)
-            (filter #(and (= host (:host %)) (= port (:port %)))
-                    profile-requests))))
-      
-      (worker-heartbeat!
-        [this storm-id node port info]
-        (let [thrift-worker-hb (thriftify-zk-worker-hb info)]
-          (if thrift-worker-hb
-            (.set_worker_hb cluster-state (workerbeat-path storm-id node port) 
(Utils/serialize thrift-worker-hb) acls))))
-
-      (remove-worker-heartbeat!
-        [this storm-id node port]
-        (.delete_worker_hb cluster-state (workerbeat-path storm-id node port)))
-
-      (setup-heartbeats!
-        [this storm-id]
-        (.mkdirs cluster-state (workerbeat-storm-root storm-id) acls))
-
-      (teardown-heartbeats!
-        [this storm-id]
-        (try-cause
-          (.delete_worker_hb cluster-state (workerbeat-storm-root storm-id))
-          (catch KeeperException e
-            (log-warn-error e "Could not teardown heartbeats for " storm-id))))
-
-      (worker-backpressure!
-        [this storm-id node port on?]
-        "if znode exists and to be not on?, delete; if exists and on?, do 
nothing;
-        if not exists and to be on?, create; if not exists and not on?, do 
nothing"
-        (let [path (backpressure-path storm-id node port)
-              existed (.node_exists cluster-state path false)]
-          (if existed
-            (if (not on?)
-              (.delete_node cluster-state path))   ;; delete the znode since 
the worker is not congested
-            (if on?
-              (.set_ephemeral_node cluster-state path nil acls))))) ;; create 
the znode since worker is congested
-    
-      (topology-backpressure
-        [this storm-id callback]
-        "if the backpresure/storm-id dir is not empty, this topology has 
throttle-on, otherwise throttle-off.
-         The backpressure/storm-id dir may not exist if nimbus has shutdown 
the topology"
-        (when callback
-          (swap! backpressure-callback assoc storm-id callback))
-        (let [path (backpressure-storm-root storm-id)
-              children (if (.node_exists cluster-state path false)
-                         (.get_children cluster-state path (not-nil? 
callback))) ]
-              (> (count children) 0)))
-      
-      (setup-backpressure!
-        [this storm-id]
-        (.mkdirs cluster-state (backpressure-storm-root storm-id) acls))
-
-      (remove-backpressure!
-        [this storm-id]
-        (.delete_node cluster-state (backpressure-storm-root storm-id)))
-
-      (remove-worker-backpressure!
-        [this storm-id node port]
-        (let [path (backpressure-path storm-id node port)
-              existed (.node_exists cluster-state path false)]
-          (if existed
-            (.delete_node cluster-state (backpressure-path storm-id node 
port)))))
-    
-      (teardown-topology-errors!
-        [this storm-id]
-        (try-cause
-          (.delete_node cluster-state (error-storm-root storm-id))
-          (catch KeeperException e
-            (log-warn-error e "Could not teardown errors for " storm-id))))
-
-      (supervisor-heartbeat!
-        [this supervisor-id info]
-        (let [thrift-supervisor-info (thriftify-supervisor-info info)]
-          (.set_ephemeral_node cluster-state (supervisor-path supervisor-id) 
(Utils/serialize thrift-supervisor-info) acls)))
-
-      (activate-storm!
-        [this storm-id storm-base]
-        (let [thrift-storm-base (thriftify-storm-base storm-base)]
-          (.set_data cluster-state (storm-path storm-id) (Utils/serialize 
thrift-storm-base) acls)))
-
-      (update-storm!
-        [this storm-id new-elems]
-        (let [base (storm-base this storm-id nil)
-              executors (:component->executors base)
-              component->debug (:component->debug base)
-              new-elems (update new-elems :component->executors (partial merge 
executors))
-              new-elems (update new-elems :component->debug (partial 
merge-with merge component->debug))]
-          (.set_data cluster-state (storm-path storm-id)
-                    (-> base
-                        (merge new-elems)
-                        thriftify-storm-base
-                        Utils/serialize)
-                    acls)))
-
-      (storm-base
-        [this storm-id callback]
-        (when callback
-          (swap! storm-base-callback assoc storm-id callback))
-        (clojurify-storm-base (maybe-deserialize (.get_data cluster-state 
(storm-path storm-id) (not-nil? callback)) StormBase)))
-
-      (remove-storm-base!
-        [this storm-id]
-        (.delete_node cluster-state (storm-path storm-id)))
-
-      (set-assignment!
-        [this storm-id info]
-        (let [thrift-assignment (thriftify-assignment info)]
-          (.set_data cluster-state (assignment-path storm-id) (Utils/serialize 
thrift-assignment) acls)))
-
-      (remove-blobstore-key!
-        [this blob-key]
-        (log-debug "removing key" blob-key)
-        (.delete_node cluster-state (blobstore-path blob-key)))
-
-      (remove-key-version!
-        [this blob-key]
-        (.delete_node cluster-state (blobstore-max-key-sequence-number-path 
blob-key)))
-
-      (remove-storm!
-        [this storm-id]
-        (.delete_node cluster-state (assignment-path storm-id))
-        (.delete_node cluster-state (credentials-path storm-id))
-        (.delete_node cluster-state (log-config-path storm-id))
-        (.delete_node cluster-state (profiler-config-path storm-id))
-        (remove-storm-base! this storm-id))
-
-      (set-credentials!
-         [this storm-id creds topo-conf]
-         (let [topo-acls (mk-topo-only-acls topo-conf)
-               path (credentials-path storm-id)
-               thriftified-creds (thriftify-credentials creds)]
-           (.set_data cluster-state path (Utils/serialize thriftified-creds) 
topo-acls)))
-
-      (credentials
-        [this storm-id callback]
-        (when callback
-          (swap! credentials-callback assoc storm-id callback))
-        (clojurify-crdentials (maybe-deserialize (.get_data cluster-state 
(credentials-path storm-id) (not-nil? callback)) Credentials)))
-
-      (report-error
-         [this storm-id component-id node port error]
-         (let [path (error-path storm-id component-id)
-               last-error-path (last-error-path storm-id component-id)
-               data (thriftify-error {:time-secs (current-time-secs) :error 
(stringify-error error) :host node :port port})
-               _ (.mkdirs cluster-state path acls)
-               ser-data (Utils/serialize data)
-               _ (.mkdirs cluster-state path acls)
-               _ (.create_sequential cluster-state (str path "/e") ser-data 
acls)
-               _ (.set_data cluster-state last-error-path ser-data acls)
-               to-kill (->> (.get_children cluster-state path false)
-                            (sort-by parse-error-path)
-                            reverse
-                            (drop 10))]
-           (doseq [k to-kill]
-             (.delete_node cluster-state (str path "/" k)))))
-
-      (errors
-         [this storm-id component-id]
-         (let [path (error-path storm-id component-id)
-               errors (if (.node_exists cluster-state path false)
-                        (dofor [c (.get_children cluster-state path false)]
-                          (if-let [data (-> (.get_data cluster-state
-                                                      (str path "/" c)
-                                                      false)
-                                          (maybe-deserialize ErrorInfo)
-                                          clojurify-error)]
-                            (map->TaskError data)))
-                        ())]
-           (->> (filter not-nil? errors)
-                (sort-by (comp - :time-secs)))))
-
-      (last-error
-        [this storm-id component-id]
-        (let [path (last-error-path storm-id component-id)]
-          (if (.node_exists cluster-state path false)
-            (if-let [data (-> (.get_data cluster-state path false)
-                              (maybe-deserialize ErrorInfo)
-                              clojurify-error)]
-              (map->TaskError data)))))
-      
-      (disconnect
-         [this]
-        (.unregister cluster-state state-id)
-        (when solo?
-          (.close cluster-state))))))
-
-;; daemons have a single thread that will respond to events
-;; start with initialize event
-;; callbacks add events to the thread's queue
-
-;; keeps in memory cache of the state, only for what client subscribes to. Any 
subscription is automatically kept in sync, and when there are changes, client 
is notified.
-;; master gives orders through state, and client records status in state 
(ephemerally)
-
-;; master tells nodes what workers to launch
-
-;; master writes this. supervisors and workers subscribe to this to understand 
complete topology. each storm is a map from nodes to workers to tasks to ports 
whenever topology changes everyone will be notified
-;; master includes timestamp of each assignment so that appropriate time can 
be given to each worker to start up
-;; /assignments/{storm id}
-
-;; which tasks they talk to, etc. (immutable until shutdown)
-;; everyone reads this in full to understand structure
-;; /tasks/{storm id}/{task id} ; just contains bolt id
-
-;; supervisors send heartbeats here, master doesn't subscribe but checks 
asynchronously
-;; /supervisors/status/{ephemeral node ids}  ;; node metadata such as port 
ranges are kept here
-
-;; tasks send heartbeats here, master doesn't subscribe, just checks 
asynchronously
-;; /taskbeats/{storm id}/{ephemeral task id}
-
-;; contains data about whether it's started or not, tasks and workers 
subscribe to specific storm here to know when to shutdown
-;; master manipulates
-;; /storms/{storm id}
-
-;; Zookeeper flows:
-
-;; Master:
-;; job submit:
-;; 1. read which nodes are available
-;; 2. set up the worker/{storm}/{task} stuff (static)
-;; 3. set assignments
-;; 4. start storm - necessary in case master goes down, when goes back up can 
remember to take down the storm (2 states: on or off)
-
-;; Monitoring (or by checking when nodes go down or heartbeats aren't 
received):
-;; 1. read assignment
-;; 2. see which tasks/nodes are up
-;; 3. make new assignment to fix any problems
-;; 4. if a storm exists but is not taken down fully, ensure that storm 
takedown is launched (step by step remove tasks and finally remove assignments)
-
-;; masters only possible watches is on ephemeral nodes and tasks, and maybe 
not even
-
-;; Supervisor:
-;; 1. monitor /storms/* and assignments
-;; 2. local state about which workers are local
-;; 3. when storm is on, check that workers are running locally & start/kill if 
different than assignments
-;; 4. when storm is off, monitor tasks for workers - when they all die or 
don't hearbeat, kill the process and cleanup
-
-;; Worker:
-;; 1. On startup, start the tasks if the storm is on
-
-;; Task:
-;; 1. monitor assignments, reroute when assignments change
-;; 2. monitor storm (when storm turns off, error if assignments change) - take 
down tasks as master turns them off
-
-;; locally on supervisor: workers write pids locally on startup, supervisor 
deletes it on shutdown (associates pid with worker name)
-;; supervisor periodically checks to make sure processes are alive
-;; {rootdir}/workers/{storm id}/{worker id}   ;; contains pid inside
-
-;; all tasks in a worker share the same cluster state
-;; workers, supervisors, and tasks subscribes to storm to know when it's 
started or stopped
-;; on stopped, master removes records in order (tasks need to subscribe to 
themselves to see if they disappear)
-;; when a master removes a worker, the supervisor should kill it (and escalate 
to kill -9)
-;; on shutdown, tasks subscribe to tasks that send data to them to wait for 
them to die. when node disappears, they can die

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj
----------------------------------------------------------------------
diff --git 
a/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj 
b/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj
deleted file mode 100644
index 061c9e8..0000000
--- 
a/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj
+++ /dev/null
@@ -1,165 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.cluster-state.zookeeper-state-factory
-  (:import [org.apache.curator.framework.state ConnectionStateListener])
-  (:import [org.apache.zookeeper KeeperException$NoNodeException 
KeeperException$NodeExistsException]
-           [org.apache.storm.cluster ClusterState DaemonType])
-  (:import [org.apache.storm.utils StormConnectionStateConverter])
-  (:use [org.apache.storm cluster config log util])
-  (:require [org.apache.storm [zookeeper :as zk]])
-  (:gen-class
-   :implements [org.apache.storm.cluster.ClusterStateFactory]))
-
-(defn -mkState [this conf auth-conf acls context]
-  (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf 
STORM-ZOOKEEPER-PORT) :auth-conf auth-conf)]
-    (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT) acls)
-    (.close zk))
-  (let [callbacks (atom {})
-        active (atom true)
-        zk-writer (zk/mk-client conf
-                         (conf STORM-ZOOKEEPER-SERVERS)
-                         (conf STORM-ZOOKEEPER-PORT)
-                         :auth-conf auth-conf
-                         :root (conf STORM-ZOOKEEPER-ROOT)
-                         :watcher (fn [state type path]
-                                    (when @active
-                                      (when-not (= :connected state)
-                                        (log-warn "Received event " state ":" 
type ":" path " with disconnected Writer Zookeeper."))
-                                      (when-not (= :none type)
-                                        (doseq [callback (vals @callbacks)]
-                                          (callback type path))))))
-        is-nimbus? (= (.getDaemonType context) DaemonType/NIMBUS)
-        zk-reader (if is-nimbus?
-                    (zk/mk-client conf
-                         (conf STORM-ZOOKEEPER-SERVERS)
-                         (conf STORM-ZOOKEEPER-PORT)
-                         :auth-conf auth-conf
-                         :root (conf STORM-ZOOKEEPER-ROOT)
-                         :watcher (fn [state type path]
-                                    (when @active
-                                      (when-not (= :connected state)
-                                        (log-warn "Received event " state ":" 
type ":" path " with disconnected Reader Zookeeper."))
-                                      (when-not (= :none type)
-                                        (doseq [callback (vals @callbacks)]
-                                          (callback type path))))))
-                    zk-writer)]
-    (reify
-     ClusterState
-
-     (register
-       [this callback]
-       (let [id (uuid)]
-         (swap! callbacks assoc id callback)
-         id))
-
-     (unregister
-       [this id]
-       (swap! callbacks dissoc id))
-
-     (set-ephemeral-node
-       [this path data acls]
-       (zk/mkdirs zk-writer (parent-path path) acls)
-       (if (zk/exists zk-writer path false)
-         (try-cause
-           (zk/set-data zk-writer path data) ; should verify that it's 
ephemeral
-           (catch KeeperException$NoNodeException e
-             (log-warn-error e "Ephemeral node disappeared between checking 
for existing and setting data")
-             (zk/create-node zk-writer path data :ephemeral acls)))
-         (zk/create-node zk-writer path data :ephemeral acls)))
-
-     (create-sequential
-       [this path data acls]
-       (zk/create-node zk-writer path data :sequential acls))
-
-     (set-data
-       [this path data acls]
-       ;; note: this does not turn off any existing watches
-       (if (zk/exists zk-writer path false)
-         (zk/set-data zk-writer path data)
-         (do
-           (zk/mkdirs zk-writer (parent-path path) acls)
-           (try-cause
-             (zk/create-node zk-writer path data :persistent acls)
-             (catch KeeperException$NodeExistsException e
-               (zk/set-data zk-writer path data))))))
-
-     (set-worker-hb
-       [this path data acls]
-       (.set_data this path data acls))
-
-     (delete-node
-       [this path]
-       (zk/delete-node zk-writer path))
-
-     (delete-worker-hb
-       [this path]
-       (.delete_node this path))
-
-     (get-data
-       [this path watch?]
-       (zk/get-data zk-reader path watch?))
-
-     (get-data-with-version
-       [this path watch?]
-       (zk/get-data-with-version zk-reader path watch?))
-
-     (get-version
-       [this path watch?]
-       (zk/get-version zk-reader path watch?))
-
-     (get-worker-hb
-       [this path watch?]
-       (.get_data this path watch?))
-
-     (get-children
-       [this path watch?]
-       (zk/get-children zk-reader path watch?))
-
-     (get-worker-hb-children
-       [this path watch?]
-       (.get_children this path watch?))
-
-     (mkdirs
-       [this path acls]
-       (zk/mkdirs zk-writer path acls))
-
-     (node-exists
-       [this path watch?]
-       (zk/exists-node? zk-reader path watch?))
-
-     (add-listener
-       [this listener]
-       (let [curator-listener (reify ConnectionStateListener
-                                (stateChanged
-                                  [this client newState]
-                                  (.stateChanged listener 
(StormConnectionStateConverter/convert newState))))]
-         (zk/add-listener zk-reader curator-listener)))
-
-     (sync-path
-       [this path]
-       (zk/sync-path zk-writer path))
-
-      (delete-node-blobstore
-        [this path nimbus-host-port-info]
-        (zk/delete-node-blobstore zk-writer path nimbus-host-port-info))
-
-     (close
-       [this]
-       (reset! active false)
-       (.close zk-writer)
-       (if is-nimbus?
-         (.close zk-reader))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/command/activate.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/activate.clj 
b/storm-core/src/clj/backtype/storm/command/activate.clj
deleted file mode 100644
index dc452e8..0000000
--- a/storm-core/src/clj/backtype/storm/command/activate.clj
+++ /dev/null
@@ -1,24 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.activate
-  (:use [org.apache.storm thrift log])
-  (:gen-class))
-
-(defn -main [name] 
-  (with-configured-nimbus-connection nimbus
-    (.activate nimbus name)
-    (log-message "Activated topology: " name)
-    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/command/blobstore.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/blobstore.clj 
b/storm-core/src/clj/backtype/storm/command/blobstore.clj
deleted file mode 100644
index b1496db..0000000
--- a/storm-core/src/clj/backtype/storm/command/blobstore.clj
+++ /dev/null
@@ -1,162 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.blobstore
-  (:import [java.io InputStream OutputStream]
-           [org.apache.storm.generated SettableBlobMeta AccessControl 
AuthorizationException
-            KeyNotFoundException]
-           [org.apache.storm.blobstore BlobStoreAclHandler])
-  (:use [org.apache.storm config]
-        [clojure.string :only [split]]
-        [clojure.tools.cli :only [cli]]
-        [clojure.java.io :only [copy input-stream output-stream]]
-        [org.apache.storm blobstore log util])
-  (:gen-class))
-
-(defn update-blob-from-stream
-  "Update a blob in the blob store from an InputStream"
-  [key ^InputStream in]
-  (with-configured-blob-client blobstore
-    (let [out (.updateBlob blobstore key)]
-      (try 
-        (copy in out)
-        (.close out)
-        (catch Exception e
-          (log-message e)
-          (.cancel out)
-          (throw e))))))
-
-(defn create-blob-from-stream
-  "Create a blob in the blob store from an InputStream"
-  [key ^InputStream in ^SettableBlobMeta meta]
-  (with-configured-blob-client blobstore
-    (let [out (.createBlob blobstore key meta)]
-      (try 
-        (copy in out)
-        (.close out)
-        (catch Exception e
-          (.cancel out)
-          (throw e))))))
-
-(defn read-blob
-  "Read a blob in the blob store and write to an OutputStream"
-  [key ^OutputStream out]
-  (with-configured-blob-client blobstore
-    (with-open [in (.getBlob blobstore key)]
-      (copy in out))))
-
-(defn as-access-control
-  "Convert a parameter to an AccessControl object"
-  [param]
-  (BlobStoreAclHandler/parseAccessControl (str param)))
-
-(defn as-acl
-  [param]
-  (map as-access-control (split param #",")))
-
-(defn access-control-str
-  [^AccessControl acl]
-  (BlobStoreAclHandler/accessControlToString acl))
-
-(defn read-cli [args]
-  (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])]
-    (if file
-      (with-open [f (output-stream file)]
-        (read-blob key f))
-      (read-blob key System/out))))
-
-(defn update-cli [args]
-  (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])]
-    (if file
-      (with-open [f (input-stream file)]
-        (update-blob-from-stream key f))
-      (update-blob-from-stream key System/in))
-    (log-message "Successfully updated " key)))
-
-(defn create-cli [args]
-  (let [[{file :file acl :acl replication-factor :replication-factor} [key] _] 
(cli args ["-f" "--file" :default nil]
-                                                  ["-a" "--acl" :default [] 
:parse-fn as-acl]
-                                                  ["-r" "--replication-factor" 
:default -1 :parse-fn parse-int])
-        meta (doto (SettableBlobMeta. acl)
-                   (.set_replication_factor replication-factor))]
-    (validate-key-name! key)
-    (log-message "Creating " key " with ACL " (pr-str (map access-control-str 
acl)))
-    (if file
-      (with-open [f (input-stream file)]
-        (create-blob-from-stream key f meta))
-      (create-blob-from-stream key System/in meta))
-    (log-message "Successfully created " key)))
-
-(defn delete-cli [args]
-  (with-configured-blob-client blobstore
-    (doseq [key args]
-      (.deleteBlob blobstore key)
-      (log-message "deleted " key))))
-
-(defn list-cli [args]
-  (with-configured-blob-client blobstore
-    (let [keys (if (empty? args) (iterator-seq (.listKeys blobstore)) args)]
-      (doseq [key keys]
-        (try
-          (let [meta (.getBlobMeta blobstore key)
-                version (.get_version meta)
-                acl (.get_acl (.get_settable meta))]
-            (log-message key " " version " " (pr-str (map access-control-str 
acl))))
-          (catch AuthorizationException ae
-            (if-not (empty? args) (log-error "ACCESS DENIED to key: " key)))
-          (catch KeyNotFoundException knf
-            (if-not (empty? args) (log-error key " NOT FOUND"))))))))
-
-(defn set-acl-cli [args]
-  (let [[{set-acl :set} [key] _]
-           (cli args ["-s" "--set" :default [] :parse-fn as-acl])]
-    (with-configured-blob-client blobstore
-      (let [meta (.getBlobMeta blobstore key)
-            acl (.get_acl (.get_settable meta))
-            new-acl (if set-acl set-acl acl)
-            new-meta (SettableBlobMeta. new-acl)]
-        (log-message "Setting ACL for " key " to " (pr-str (map 
access-control-str new-acl)))
-        (.setBlobMeta blobstore key new-meta)))))
-
-(defn rep-cli [args]
-  (let [sub-command (first args)
-        new-args (rest args)]
-    (with-configured-blob-client blobstore
-      (condp = sub-command
-      "--read" (let [key (first new-args)
-                     blob-replication (.getBlobReplication blobstore key)]
-                 (log-message "Current replication factor " blob-replication)
-                 blob-replication)
-      "--update" (let [[{replication-factor :replication-factor} [key] _]
-                        (cli new-args ["-r" "--replication-factor" :parse-fn 
parse-int])]
-                   (if (nil? replication-factor)
-                     (throw (RuntimeException. (str "Please set the 
replication factor")))
-                     (let [blob-replication (.updateBlobReplication blobstore 
key replication-factor)]
-                       (log-message "Replication factor is set to " 
blob-replication)
-                       blob-replication)))
-      :else (throw (RuntimeException. (str sub-command " is not a supported 
blobstore command")))))))
-
-(defn -main [& args]
-  (let [command (first args)
-        new-args (rest args)]
-    (condp = command
-      "cat" (read-cli new-args)
-      "create" (create-cli new-args)
-      "update" (update-cli new-args)
-      "delete" (delete-cli new-args)
-      "list" (list-cli new-args)
-      "set-acl" (set-acl-cli new-args)
-      "replication" (rep-cli new-args)
-      :else (throw (RuntimeException. (str command " is not a supported 
blobstore command"))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/command/config_value.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/config_value.clj 
b/storm-core/src/clj/backtype/storm/command/config_value.clj
deleted file mode 100644
index 9bc3e92..0000000
--- a/storm-core/src/clj/backtype/storm/command/config_value.clj
+++ /dev/null
@@ -1,24 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.config-value
-  (:use [org.apache.storm config log])
-  (:gen-class))
-
-
-(defn -main [^String name]
-  (let [conf (read-storm-config)]
-    (println "VALUE:" (conf name))
-    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/command/deactivate.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/deactivate.clj 
b/storm-core/src/clj/backtype/storm/command/deactivate.clj
deleted file mode 100644
index 4fd2c85..0000000
--- a/storm-core/src/clj/backtype/storm/command/deactivate.clj
+++ /dev/null
@@ -1,24 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.deactivate
-  (:use [org.apache.storm thrift log])
-  (:gen-class))
-
-(defn -main [name] 
-  (with-configured-nimbus-connection nimbus
-    (.deactivate nimbus name)
-    (log-message "Deactivated topology: " name)
-    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/command/dev_zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/dev_zookeeper.clj 
b/storm-core/src/clj/backtype/storm/command/dev_zookeeper.clj
deleted file mode 100644
index 96de02d..0000000
--- a/storm-core/src/clj/backtype/storm/command/dev_zookeeper.clj
+++ /dev/null
@@ -1,26 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.dev-zookeeper
-  (:use [org.apache.storm zookeeper util config])
-  (:gen-class))
-
-(defn -main [& args]
-  (let [conf (read-storm-config)
-        port (conf STORM-ZOOKEEPER-PORT)
-        localpath (conf DEV-ZOOKEEPER-PATH)]
-    (rmr localpath)
-    (mk-inprocess-zookeeper localpath :port port)
-    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/command/get_errors.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/get_errors.clj 
b/storm-core/src/clj/backtype/storm/command/get_errors.clj
deleted file mode 100644
index c267390..0000000
--- a/storm-core/src/clj/backtype/storm/command/get_errors.clj
+++ /dev/null
@@ -1,52 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.get-errors
-  (:use [clojure.tools.cli :only [cli]])
-  (:use [org.apache.storm thrift log])
-  (:use [org.apache.storm util])
-  (:require [org.apache.storm.daemon
-             [nimbus :as nimbus]
-             [common :as common]])
-  (:import [org.apache.storm.generated GetInfoOptions NumErrorsChoice
-            TopologySummary ErrorInfo])
-  (:gen-class))
-
-(defn get-topology-id [name topologies]
-  (let [topology (first (filter #(= (.get_name %1) name) topologies))]
-    (when (not-nil? topology) (.get_id topology))))
-
-(defn get-component-errors
-  [topology-errors]
-  (apply hash-map (remove nil?
-                    (flatten (for [[comp-name comp-errors] topology-errors]
-                               (let [latest-error (when (not (empty? 
comp-errors)) (first comp-errors))]
-                                 (if latest-error [comp-name (.get_error 
^ErrorInfo latest-error)])))))))
-
-(defn -main [name]
-  (with-configured-nimbus-connection nimbus
-    (let [opts (doto (GetInfoOptions.)
-                 (.set_num_err_choice NumErrorsChoice/ONE))
-          cluster-info (.getClusterInfo nimbus)
-          topologies (.get_topologies cluster-info)
-          topo-id (get-topology-id name topologies)
-          topo-info (when (not-nil? topo-id) (.getTopologyInfoWithOpts nimbus 
topo-id opts))]
-      (if (or (nil? topo-id) (nil? topo-info))
-        (println (to-json {"Failure" (str "No topologies running with name " 
name)}))
-        (let [topology-name (.get_name topo-info)
-              topology-errors (.get_errors topo-info)]
-          (println (to-json (hash-map
-                              "Topology Name" topology-name
-                              "Comp-Errors" (get-component-errors 
topology-errors)))))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/command/healthcheck.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/healthcheck.clj 
b/storm-core/src/clj/backtype/storm/command/healthcheck.clj
deleted file mode 100644
index d96d7b3..0000000
--- a/storm-core/src/clj/backtype/storm/command/healthcheck.clj
+++ /dev/null
@@ -1,88 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.healthcheck
-  (:require [org.apache.storm
-             [config :refer :all]
-             [log :refer :all]]
-            [clojure.java [io :as io]]
-            [clojure [string :refer [split]]])
-  (:gen-class))
-
-(defn interrupter
-  "Interrupt a given thread after ms milliseconds."
-  [thread ms]
-  (let [interrupter (Thread.
-                     (fn []
-                       (try
-                         (Thread/sleep ms)
-                         (.interrupt thread)
-                         (catch InterruptedException e))))]
-    (.start interrupter)
-    interrupter))
-
-(defn check-output [lines]
-  (if (some #(.startsWith % "ERROR") lines)
-    :failed
-    :success))
-
-(defn process-script [conf script]
-  (let [script-proc (. (Runtime/getRuntime) (exec script))
-        curthread (Thread/currentThread)
-        interrupter-thread (interrupter curthread
-                                        (conf STORM-HEALTH-CHECK-TIMEOUT-MS))]
-    (try
-      (.waitFor script-proc)
-      (.interrupt interrupter-thread)
-      (if (not (= (.exitValue script-proc) 0))
-        :failed_with_exit_code
-        (check-output (split
-                       (slurp (.getInputStream script-proc))
-                       #"\n+")))
-      (catch InterruptedException e
-        (println "Script" script "timed out.")
-        :timeout)
-      (catch Exception e
-        (println "Script failed with exception: " e)
-        :failed_with_exception)
-      (finally (.interrupt interrupter-thread)))))
-
-(defn health-check [conf]
-  (let [health-dir (absolute-healthcheck-dir conf)
-        health-files (file-seq (io/file health-dir))
-        health-scripts (filter #(and (.canExecute %)
-                                     (not (.isDirectory %)))
-                               health-files)
-        results (->> health-scripts
-                     (map #(.getAbsolutePath %))
-                     (map (partial process-script conf)))]
-    (log-message
-     (pr-str (map #'vector
-                  (map #(.getAbsolutePath %) health-scripts)
-                  results)))
-    ; failed_with_exit_code is OK. We're mimicing Hadoop's health checks.
-    ; We treat non-zero exit codes as indicators that the scripts failed
-    ; to execute properly, not that the system is unhealthy, in which case
-    ; we don't want to start killing things.
-    (if (every? #(or (= % :failed_with_exit_code)
-                     (= % :success))
-                results)
-      0
-      1)))
-
-(defn -main [& args]
-  (let [conf (read-storm-config)]
-    (System/exit
-     (health-check conf))))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/command/heartbeats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/heartbeats.clj 
b/storm-core/src/clj/backtype/storm/command/heartbeats.clj
deleted file mode 100644
index ff28cba..0000000
--- a/storm-core/src/clj/backtype/storm/command/heartbeats.clj
+++ /dev/null
@@ -1,52 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.heartbeats
-  (:require [org.apache.storm
-             [config :refer :all]
-             [log :refer :all]
-             [cluster :refer :all]
-             [converter :refer :all]]
-        [clojure.string :refer :all])
-  (:import [org.apache.storm.generated ClusterWorkerHeartbeat]
-           [org.apache.storm.utils Utils])
-  (:gen-class))
-
-(defn -main [command path & args]
-  (let [conf (read-storm-config)
-        cluster (mk-distributed-cluster-state conf :auth-conf conf)]
-    (println "Command: [" command "]")
-    (condp = command
-      "list"
-      (let [message (join " \n" (.get_worker_hb_children cluster path false))]
-        (log-message "list " path ":\n"
-                     message "\n"))
-      "get"
-      (log-message 
-       (if-let [hb (.get_worker_hb cluster path false)]
-         (clojurify-zk-worker-hb
-          (Utils/deserialize
-           hb
-           ClusterWorkerHeartbeat))
-         "Nothing"))
-      
-      (log-message "Usage: heartbeats [list|get] path"))
-    
-    (try
-      (.close cluster)
-      (catch Exception e
-        (log-message "Caught exception: " e " on close."))))
-  (System/exit 0))
-         

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/command/kill_topology.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/kill_topology.clj 
b/storm-core/src/clj/backtype/storm/command/kill_topology.clj
deleted file mode 100644
index 84e0a64..0000000
--- a/storm-core/src/clj/backtype/storm/command/kill_topology.clj
+++ /dev/null
@@ -1,29 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.kill-topology
-  (:use [clojure.tools.cli :only [cli]])
-  (:use [org.apache.storm thrift config log])
-  (:import [org.apache.storm.generated KillOptions])
-  (:gen-class))
-
-(defn -main [& args]
-  (let [[{wait :wait} [name] _] (cli args ["-w" "--wait" :default nil 
:parse-fn #(Integer/parseInt %)])
-        opts (KillOptions.)]
-    (if wait (.set_wait_secs opts wait))
-    (with-configured-nimbus-connection nimbus
-      (.killTopologyWithOpts nimbus name opts)
-      (log-message "Killed topology: " name)
-      )))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/command/kill_workers.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/kill_workers.clj 
b/storm-core/src/clj/backtype/storm/command/kill_workers.clj
deleted file mode 100644
index 2670735..0000000
--- a/storm-core/src/clj/backtype/storm/command/kill_workers.clj
+++ /dev/null
@@ -1,33 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.kill-workers
-  (:import [java.io File])
-  (:use [org.apache.storm.daemon common])
-  (:use [org.apache.storm util config])
-  (:require [org.apache.storm.daemon
-             [supervisor :as supervisor]])
-  (:gen-class))
-
-(defn -main 
-  "Construct the supervisor-data from scratch and kill the workers on this 
supervisor"
-  [& args]
-  (let [conf (read-storm-config)
-        conf (assoc conf STORM-LOCAL-DIR (. (File. (conf STORM-LOCAL-DIR)) 
getCanonicalPath))
-        isupervisor (supervisor/standalone-supervisor)
-        supervisor-data (supervisor/supervisor-data conf nil isupervisor)
-        ids (supervisor/my-worker-ids conf)]
-    (doseq [id ids]
-      (supervisor/shutdown-worker supervisor-data id))))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/command/list.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/list.clj 
b/storm-core/src/clj/backtype/storm/command/list.clj
deleted file mode 100644
index 87975cd..0000000
--- a/storm-core/src/clj/backtype/storm/command/list.clj
+++ /dev/null
@@ -1,38 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.list
-  (:use [org.apache.storm thrift log])
-  (:import [org.apache.storm.generated TopologySummary])
-  (:gen-class))
-
-(defn -main []
-  (with-configured-nimbus-connection nimbus
-    (let [cluster-info (.getClusterInfo nimbus)
-          topologies (.get_topologies cluster-info)
-          msg-format "%-20s %-10s %-10s %-12s %-10s"]
-      (if (or (nil? topologies) (empty? topologies))
-        (println "No topologies running.")
-        (do
-          (println (format msg-format "Topology_name" "Status" "Num_tasks" 
"Num_workers" "Uptime_secs"))
-          (println 
"-------------------------------------------------------------------")
-          (doseq [^TopologySummary topology topologies]
-            (let [topology-name (.get_name topology)
-                  topology-status (.get_status topology)
-                  topology-num-tasks (.get_num_tasks topology)
-                  topology-num-workers (.get_num_workers topology)
-                  topology-uptime-secs (.get_uptime_secs topology)]
-              (println (format msg-format  topology-name topology-status 
topology-num-tasks
-                               topology-num-workers 
topology-uptime-secs)))))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/command/monitor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/monitor.clj 
b/storm-core/src/clj/backtype/storm/command/monitor.clj
deleted file mode 100644
index 7fa9b2a..0000000
--- a/storm-core/src/clj/backtype/storm/command/monitor.clj
+++ /dev/null
@@ -1,37 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.monitor
-  (:use [clojure.tools.cli :only [cli]])
-  (:use [org.apache.storm.thrift :only [with-configured-nimbus-connection]])
-  (:import [org.apache.storm.utils Monitor])
-  (:gen-class)
- )
-
-(defn -main [& args]
-  (let [[{interval :interval component :component stream :stream watch :watch} 
[name] _]
-        (cli args ["-i" "--interval" :default 4 :parse-fn #(Integer/parseInt 
%)]
-          ["-m" "--component" :default nil]
-          ["-s" "--stream" :default "default"]
-          ["-w" "--watch" :default "emitted"])
-        mon (Monitor.)]
-    (if interval (.set_interval mon interval))
-    (if name (.set_topology mon name))
-    (if component (.set_component mon component))
-    (if stream (.set_stream mon stream))
-    (if watch (.set_watch mon watch))
-    (with-configured-nimbus-connection nimbus
-      (.metrics mon nimbus)
-      )))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/command/rebalance.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/rebalance.clj 
b/storm-core/src/clj/backtype/storm/command/rebalance.clj
deleted file mode 100644
index 3868091..0000000
--- a/storm-core/src/clj/backtype/storm/command/rebalance.clj
+++ /dev/null
@@ -1,46 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.rebalance
-  (:use [clojure.tools.cli :only [cli]])
-  (:use [org.apache.storm thrift config log])
-  (:import [org.apache.storm.generated RebalanceOptions])
-  (:gen-class))
-
-(defn- parse-executor [^String s]
-  (let [eq-pos (.lastIndexOf s "=")
-        name (.substring s 0 eq-pos)
-        amt (.substring s (inc eq-pos))]
-    {name (Integer/parseInt amt)}
-    ))
-
-(defn -main [& args] 
-  (let [[{wait :wait executor :executor num-workers :num-workers} [name] _]
-                  (cli args ["-w" "--wait" :default nil :parse-fn 
#(Integer/parseInt %)]
-                            ["-n" "--num-workers" :default nil :parse-fn 
#(Integer/parseInt %)]
-                            ["-e" "--executor"  :parse-fn parse-executor
-                             :assoc-fn (fn [previous key val]
-                                         (assoc previous key
-                                                (if-let [oldval (get previous 
key)]
-                                                  (merge oldval val)
-                                                  val)))])
-        opts (RebalanceOptions.)]
-    (if wait (.set_wait_secs opts wait))
-    (if executor (.set_num_executors opts executor))
-    (if num-workers (.set_num_workers opts num-workers))
-    (with-configured-nimbus-connection nimbus
-      (.rebalance nimbus name opts)
-      (log-message "Topology " name " is rebalancing")
-      )))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/command/set_log_level.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/set_log_level.clj 
b/storm-core/src/clj/backtype/storm/command/set_log_level.clj
deleted file mode 100644
index 7e1c3c5..0000000
--- a/storm-core/src/clj/backtype/storm/command/set_log_level.clj
+++ /dev/null
@@ -1,75 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.set-log-level
-  (:use [clojure.tools.cli :only [cli]])
-  (:use [org.apache.storm thrift log])
-  (:import [org.apache.logging.log4j Level])
-  (:import [org.apache.storm.generated LogConfig LogLevel LogLevelAction])
-  (:gen-class))
-
-(defn- get-storm-id
-  "Get topology id for a running topology from the topology name."
-  [nimbus name]
-  (let [info (.getClusterInfo nimbus)
-        topologies (.get_topologies info)
-        topology (first (filter (fn [topo] (= name (.get_name topo))) 
topologies))]
-    (if topology 
-      (.get_id topology)
-      (throw (.IllegalArgumentException (str name " is not a running 
topology"))))))
-
-(defn- parse-named-log-levels [action]
-  "Parses [logger name]=[level string]:[optional timeout],[logger name2]...
-
-   e.g. ROOT=DEBUG:30
-        root logger, debug for 30 seconds
-
-        org.apache.foo=WARN
-        org.apache.foo set to WARN indefinitely"
-  (fn [^String s]
-    (let [log-args (re-find #"(.*)=([A-Z]+):?(\d*)" s)
-          name (if (= action LogLevelAction/REMOVE) s (nth log-args 1))
-          level (Level/toLevel (nth log-args 2))
-          timeout-str (nth log-args 3)
-          log-level (LogLevel.)]
-      (if (= action LogLevelAction/REMOVE)
-        (.set_action log-level action)
-        (do
-          (.set_action log-level action)
-          (.set_target_log_level log-level (.toString level))
-          (.set_reset_log_level_timeout_secs log-level
-            (Integer. (if (= timeout-str "") "0" timeout-str)))))
-      {name log-level})))
-
-(defn- merge-together [previous key val]
-   (assoc previous key
-      (if-let [oldval (get previous key)]
-         (merge oldval val)
-         val)))
-
-(defn -main [& args]
-  (let [[{log-setting :log-setting remove-log-setting :remove-log-setting} 
[name] _]
-        (cli args ["-l" "--log-setting"
-                   :parse-fn (parse-named-log-levels LogLevelAction/UPDATE)
-                   :assoc-fn merge-together]
-                  ["-r" "--remove-log-setting"
-                   :parse-fn (parse-named-log-levels LogLevelAction/REMOVE)
-                   :assoc-fn merge-together])
-        log-config (LogConfig.)]
-    (doseq [[log-name log-val] (merge log-setting remove-log-setting)]
-      (.put_to_named_logger_level log-config log-name log-val))
-    (log-message "Sent log config " log-config " for topology " name)
-    (with-configured-nimbus-connection nimbus
-      (.setLogConfig nimbus (get-storm-id nimbus name) log-config))))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/shell_submission.clj 
b/storm-core/src/clj/backtype/storm/command/shell_submission.clj
deleted file mode 100644
index 887ab3b..0000000
--- a/storm-core/src/clj/backtype/storm/command/shell_submission.clj
+++ /dev/null
@@ -1,34 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.shell-submission
-  (:import [org.apache.storm StormSubmitter])
-  (:use [org.apache.storm thrift util config log zookeeper])
-  (:require [clojure.string :as str])
-  (:gen-class))
-
-
-(defn -main [^String tmpjarpath & args]
-  (let [conf (read-storm-config)
-        ; since this is not a purpose to add to leader lock queue, passing nil 
as blob-store is ok
-        zk-leader-elector (zk-leader-elector conf nil)
-        leader-nimbus (.getLeader zk-leader-elector)
-        host (.getHost leader-nimbus)
-        port (.getPort leader-nimbus)
-        no-op (.close zk-leader-elector)
-        jarpath (StormSubmitter/submitJar conf tmpjarpath)
-        args (concat args [host port jarpath])]
-    (exec-command! (str/join " " args))
-    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/command/upload_credentials.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/upload_credentials.clj 
b/storm-core/src/clj/backtype/storm/command/upload_credentials.clj
deleted file mode 100644
index f63bde4..0000000
--- a/storm-core/src/clj/backtype/storm/command/upload_credentials.clj
+++ /dev/null
@@ -1,35 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.upload-credentials
-  (:use [clojure.tools.cli :only [cli]])
-  (:use [org.apache.storm log util])
-  (:import [org.apache.storm StormSubmitter])
-  (:import [java.util Properties])
-  (:import [java.io FileReader])
-  (:gen-class))
-
-(defn read-map [file-name]
-  (let [props (Properties. )
-        _ (.load props (FileReader. file-name))]
-    (clojurify-structure props)))
-
-(defn -main [& args]
-  (let [[{cred-file :file} [name & rawCreds]] (cli args ["-f" "--file" 
:default nil])
-        _ (when (and rawCreds (not (even? (.size rawCreds)))) (throw 
(RuntimeException.  "Need an even number of arguments to make a map")))
-        mapping (if rawCreds (apply assoc {} rawCreds) {})
-        file-mapping (if (nil? cred-file) {} (read-map cred-file))]
-      (StormSubmitter/pushCredentials name {} (merge file-mapping mapping))
-      (log-message "Uploaded new creds to topology: " name)))

Reply via email to