http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/config.clj 
b/storm-core/src/clj/org/apache/storm/config.clj
new file mode 100644
index 0000000..d65c439
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/config.clj
@@ -0,0 +1,331 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.config
+  (:import [java.io FileReader File IOException]
+           [org.apache.storm.generated StormTopology])
+  (:import [org.apache.storm Config])
+  (:import [org.apache.storm.utils Utils LocalState])
+  (:import [org.apache.storm.validation ConfigValidation])
+  (:import [org.apache.commons.io FileUtils])
+  (:require [clojure [string :as str]])
+  (:use [org.apache.storm log util]))
+
+(def RESOURCES-SUBDIR "resources")
+(def NIMBUS-DO-NOT-REASSIGN "NIMBUS-DO-NOT-REASSIGN")
+
+(defn- clojure-config-name [name]
+  (.replace (.toUpperCase name) "_" "-"))
+
+; define clojure constants for every configuration parameter
+(doseq [f (seq (.getFields Config))]
+  (let [name (.getName f)
+        new-name (clojure-config-name name)]
+    (eval
+      `(def ~(symbol new-name) (. Config ~(symbol name))))))
+
+(def ALL-CONFIGS
+  (dofor [f (seq (.getFields Config))]
+         (.get f nil)))
+
+
+(defn cluster-mode
+  [conf & args]
+  (keyword (conf STORM-CLUSTER-MODE)))
+
+(defn local-mode?
+  [conf]
+  (let [mode (conf STORM-CLUSTER-MODE)]
+    (condp = mode
+      "local" true
+      "distributed" false
+      (throw (IllegalArgumentException.
+               (str "Illegal cluster mode in conf: " mode))))))
+
+(defn sampling-rate
+  [conf]
+  (->> (conf TOPOLOGY-STATS-SAMPLE-RATE)
+       (/ 1)
+       int))
+
+(defn mk-stats-sampler
+  [conf]
+  (even-sampler (sampling-rate conf)))
+
+(defn read-default-config
+  []
+  (clojurify-structure (Utils/readDefaultConfig)))
+
+(defn validate-configs-with-schemas
+  [conf]
+  (ConfigValidation/validateFields conf))
+
+(defn read-storm-config
+  []
+  (let [conf (clojurify-structure (Utils/readStormConfig))]
+    (validate-configs-with-schemas conf)
+    conf))
+
+(defn read-yaml-config
+  ([name must-exist]
+     (let [conf (clojurify-structure (Utils/findAndReadConfigFile name 
must-exist))]
+       (validate-configs-with-schemas conf)
+       conf))
+  ([name]
+     (read-yaml-config true)))
+
+(defn absolute-storm-local-dir [conf]
+  (let [storm-home (System/getProperty "storm.home")
+        path (conf STORM-LOCAL-DIR)]
+    (if path
+      (if (is-absolute-path? path) path (str storm-home file-path-separator 
path))
+      (str storm-home file-path-separator "storm-local"))))
+
+(def LOG-DIR
+  (.getCanonicalPath
+    (clojure.java.io/file (or (System/getProperty "storm.log.dir")
+                              (get (read-storm-config) "storm.log.dir")
+                              (str (System/getProperty "storm.home") 
file-path-separator "logs")))))
+
+(defn absolute-healthcheck-dir [conf]
+  (let [storm-home (System/getProperty "storm.home")
+        path (conf STORM-HEALTH-CHECK-DIR)]
+    (if path
+      (if (is-absolute-path? path) path (str storm-home file-path-separator 
path))
+      (str storm-home file-path-separator "healthchecks"))))
+
+(defn master-local-dir
+  [conf]
+  (let [ret (str (absolute-storm-local-dir conf) file-path-separator "nimbus")]
+    (FileUtils/forceMkdir (File. ret))
+    ret))
+
+(defn master-stormjar-key
+  [topology-id]
+  (str topology-id "-stormjar.jar"))
+
+(defn master-stormcode-key
+  [topology-id]
+  (str topology-id "-stormcode.ser"))
+
+(defn master-stormconf-key
+  [topology-id]
+  (str topology-id "-stormconf.ser"))
+
+(defn master-stormdist-root
+  ([conf]
+   (str (master-local-dir conf) file-path-separator "stormdist"))
+  ([conf storm-id]
+   (str (master-stormdist-root conf) file-path-separator storm-id)))
+
+(defn master-tmp-dir
+  [conf]
+  (let [ret (str (master-local-dir conf) file-path-separator "tmp")]
+    (FileUtils/forceMkdir (File. ret))
+    ret ))
+
+(defn read-supervisor-storm-conf-given-path
+  [conf stormconf-path]
+  (merge conf (clojurify-structure (Utils/fromCompressedJsonConf 
(FileUtils/readFileToByteArray (File. stormconf-path))))))
+
+(defn master-storm-metafile-path [stormroot ]
+  (str stormroot file-path-separator "storm-code-distributor.meta"))
+
+(defn master-stormjar-path
+  [stormroot]
+  (str stormroot file-path-separator "stormjar.jar"))
+
+(defn master-stormcode-path
+  [stormroot]
+  (str stormroot file-path-separator "stormcode.ser"))
+
+(defn master-stormconf-path
+  [stormroot]
+  (str stormroot file-path-separator "stormconf.ser"))
+
+(defn master-inbox
+  [conf]
+  (let [ret (str (master-local-dir conf) file-path-separator "inbox")]
+    (FileUtils/forceMkdir (File. ret))
+    ret ))
+
+(defn master-inimbus-dir
+  [conf]
+  (str (master-local-dir conf) file-path-separator "inimbus"))
+
+(defn supervisor-local-dir
+  [conf]
+  (let [ret (str (absolute-storm-local-dir conf) file-path-separator 
"supervisor")]
+    (FileUtils/forceMkdir (File. ret))
+    ret))
+
+(defn supervisor-isupervisor-dir
+  [conf]
+  (str (supervisor-local-dir conf) file-path-separator "isupervisor"))
+
+(defn supervisor-stormdist-root
+  ([conf]
+   (str (supervisor-local-dir conf) file-path-separator "stormdist"))
+  ([conf storm-id]
+   (str (supervisor-stormdist-root conf) file-path-separator (url-encode 
storm-id))))
+
+(defn supervisor-stormjar-path [stormroot]
+  (str stormroot file-path-separator "stormjar.jar"))
+
+(defn supervisor-storm-metafile-path [stormroot]
+  (str stormroot file-path-separator "storm-code-distributor.meta"))
+
+(defn supervisor-stormcode-path
+  [stormroot]
+  (str stormroot file-path-separator "stormcode.ser"))
+
+(defn supervisor-stormconf-path
+  [stormroot]
+  (str stormroot file-path-separator "stormconf.ser"))
+
+(defn supervisor-tmp-dir
+  [conf]
+  (let [ret (str (supervisor-local-dir conf) file-path-separator "tmp")]
+    (FileUtils/forceMkdir (File. ret))
+    ret ))
+
+(defn supervisor-storm-resources-path
+  [stormroot]
+  (str stormroot file-path-separator RESOURCES-SUBDIR))
+
+(defn ^LocalState supervisor-state
+  [conf]
+  (LocalState. (str (supervisor-local-dir conf) file-path-separator 
"localstate")))
+
+(defn ^LocalState nimbus-topo-history-state
+  [conf]
+  (LocalState. (str (master-local-dir conf) file-path-separator "history")))
+
+(defn read-supervisor-storm-conf
+  [conf storm-id]
+  (let [stormroot (supervisor-stormdist-root conf storm-id)
+        conf-path (supervisor-stormconf-path stormroot)]
+    (read-supervisor-storm-conf-given-path conf conf-path)))
+
+(defn read-supervisor-topology
+  [conf storm-id]
+  (let [stormroot (supervisor-stormdist-root conf storm-id)
+        topology-path (supervisor-stormcode-path stormroot)]
+    (Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)) 
StormTopology)
+    ))
+
+(defn worker-user-root [conf]
+  (str (absolute-storm-local-dir conf) "/workers-users"))
+
+(defn worker-user-file [conf worker-id]
+  (str (worker-user-root conf) "/" worker-id))
+
+(defn get-worker-user [conf worker-id]
+  (log-message "GET worker-user " worker-id)
+  (try
+    (str/trim (slurp (worker-user-file conf worker-id)))
+  (catch IOException e
+    (log-warn-error e "Failed to get worker user for " worker-id ".")
+    nil
+    )))
+
+(defn get-id-from-blob-key
+  [key]
+  (if-let [groups (re-find 
#"^(.*)((-stormjar\.jar)|(-stormcode\.ser)|(-stormconf\.ser))$" key)]
+    (nth groups 1)))
+
+(defn set-worker-user! [conf worker-id user]
+  (log-message "SET worker-user " worker-id " " user)
+  (let [file (worker-user-file conf worker-id)]
+    (.mkdirs (.getParentFile (File. file)))
+    (spit (worker-user-file conf worker-id) user)))
+
+(defn remove-worker-user! [conf worker-id]
+  (log-message "REMOVE worker-user " worker-id)
+  (.delete (File. (worker-user-file conf worker-id))))
+
+(defn worker-artifacts-root
+  ([conf]
+   (let [workers-artifacts-dir (conf STORM-WORKERS-ARTIFACTS-DIR)]
+     (if workers-artifacts-dir
+       (if (is-absolute-path? workers-artifacts-dir)
+         workers-artifacts-dir
+         (str LOG-DIR file-path-separator workers-artifacts-dir))
+       (str LOG-DIR file-path-separator "workers-artifacts"))))
+  ([conf id]
+   (str (worker-artifacts-root conf) file-path-separator id))
+  ([conf id port]
+   (str (worker-artifacts-root conf id) file-path-separator port)))
+
+(defn worker-artifacts-pid-path
+  [conf id port]
+  (str (worker-artifacts-root conf id port) file-path-separator "worker.pid"))
+
+(defn get-log-metadata-file
+  ([fname]
+    (let [[id port & _] (str/split fname (re-pattern file-path-separator))]
+      (get-log-metadata-file (read-storm-config) id port)))
+  ([conf id port]
+    (clojure.java.io/file (str (worker-artifacts-root conf id) 
file-path-separator port file-path-separator) "worker.yaml")))
+
+(defn get-worker-dir-from-root
+  [log-root id port]
+  (clojure.java.io/file (str log-root file-path-separator id 
file-path-separator port)))
+
+(defn worker-root
+  ([conf]
+   (str (absolute-storm-local-dir conf) file-path-separator "workers"))
+  ([conf id]
+   (str (worker-root conf) file-path-separator id)))
+
+(defn worker-pids-root
+  [conf id]
+  (str (worker-root conf id) file-path-separator "pids"))
+
+(defn worker-pid-path
+  [conf id pid]
+  (str (worker-pids-root conf id) file-path-separator pid))
+
+(defn worker-heartbeats-root
+  [conf id]
+  (str (worker-root conf id) file-path-separator "heartbeats"))
+
+;; workers heartbeat here with pid and timestamp
+;; if supervisor stops receiving heartbeat, it kills and restarts the process
+;; in local mode, keep a global map of ids to threads for simulating process 
management
+(defn ^LocalState worker-state
+  [conf id]
+  (LocalState. (worker-heartbeats-root conf id)))
+
+(defn override-login-config-with-system-property [conf]
+  (if-let [login_conf_file (System/getProperty 
"java.security.auth.login.config")]
+    (assoc conf "java.security.auth.login.config" login_conf_file)
+    conf))
+
+(defn get-topo-logs-users
+  [topology-conf]
+  (sort (distinct (remove nil?
+                    (concat
+                      (topology-conf LOGS-USERS)
+                      (topology-conf TOPOLOGY-USERS))))))
+
+(defn get-topo-logs-groups
+  [topology-conf]
+  (sort (distinct (remove nil?
+                    (concat
+                      (topology-conf LOGS-GROUPS)
+                      (topology-conf TOPOLOGY-GROUPS))))))
+

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/converter.clj 
b/storm-core/src/clj/org/apache/storm/converter.clj
new file mode 100644
index 0000000..bb2dc87
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/converter.clj
@@ -0,0 +1,277 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.converter
+  (:import [org.apache.storm.generated SupervisorInfo NodeInfo Assignment 
WorkerResources
+            StormBase TopologyStatus ClusterWorkerHeartbeat ExecutorInfo 
ErrorInfo Credentials RebalanceOptions KillOptions
+            TopologyActionOptions DebugOptions ProfileRequest])
+  (:use [org.apache.storm util stats log])
+  (:require [org.apache.storm.daemon [common :as common]]))
+
+(defn thriftify-supervisor-info [supervisor-info]
+  (doto (SupervisorInfo.)
+    (.set_time_secs (long (:time-secs supervisor-info)))
+    (.set_hostname (:hostname supervisor-info))
+    (.set_assignment_id (:assignment-id supervisor-info))
+    (.set_used_ports (map long (:used-ports supervisor-info)))
+    (.set_meta (map long (:meta supervisor-info)))
+    (.set_scheduler_meta (:scheduler-meta supervisor-info))
+    (.set_uptime_secs (long (:uptime-secs supervisor-info)))
+    (.set_version (:version supervisor-info))
+    (.set_resources_map (:resources-map supervisor-info))
+    ))
+
+(defn clojurify-supervisor-info [^SupervisorInfo supervisor-info]
+  (if supervisor-info
+    (org.apache.storm.daemon.common.SupervisorInfo.
+      (.get_time_secs supervisor-info)
+      (.get_hostname supervisor-info)
+      (.get_assignment_id supervisor-info)
+      (if (.get_used_ports supervisor-info) (into [] (.get_used_ports 
supervisor-info)))
+      (if (.get_meta supervisor-info) (into [] (.get_meta supervisor-info)))
+      (if (.get_scheduler_meta supervisor-info) (into {} (.get_scheduler_meta 
supervisor-info)))
+      (.get_uptime_secs supervisor-info)
+      (.get_version supervisor-info)
+      (if-let [res-map (.get_resources_map supervisor-info)] (into {} 
res-map)))))
+
+(defn thriftify-assignment [assignment]
+  (let [thrift-assignment (doto (Assignment.)
+                            (.set_master_code_dir (:master-code-dir 
assignment))
+                            (.set_node_host (:node->host assignment))
+                            (.set_executor_node_port (into {}
+                                                           (map (fn [[k v]]
+                                                                  [(map long k)
+                                                                   (NodeInfo. 
(first v) (set (map long (rest v))))])
+                                                                
(:executor->node+port assignment))))
+                            (.set_executor_start_time_secs
+                              (into {}
+                                    (map (fn [[k v]]
+                                           [(map long k) (long v)])
+                                         (:executor->start-time-secs 
assignment)))))]
+    (if (:worker->resources assignment)
+      (.set_worker_resources thrift-assignment (into {} (map
+                                                          (fn [[node+port 
resources]]
+                                                            [(NodeInfo. (first 
node+port) (set (map long (rest node+port))))
+                                                             (doto 
(WorkerResources.)
+                                                               
(.set_mem_on_heap (first resources))
+                                                               
(.set_mem_off_heap (second resources))
+                                                               (.set_cpu (last 
resources)))])
+                                                          (:worker->resources 
assignment)))))
+    thrift-assignment))
+
+(defn clojurify-executor->node_port [executor->node_port]
+  (into {}
+    (map-val
+      (fn [nodeInfo]
+        (concat [(.get_node nodeInfo)] (.get_port nodeInfo))) ;nodeInfo should 
be converted to [node,port1,port2..]
+      (map-key
+        (fn [list-of-executors]
+          (into [] list-of-executors)) ; list of executors must be coverted to 
clojure vector to ensure it is sortable.
+        executor->node_port))))
+
+(defn clojurify-worker->resources [worker->resources]
+  "convert worker info to be [node, port]
+   convert resources to be [mem_on_heap mem_off_heap cpu]"
+  (into {} (map
+             (fn [[nodeInfo resources]]
+               [(concat [(.get_node nodeInfo)] (.get_port nodeInfo))
+                [(.get_mem_on_heap resources) (.get_mem_off_heap resources) 
(.get_cpu resources)]])
+             worker->resources)))
+
+(defn clojurify-assignment [^Assignment assignment]
+  (if assignment
+    (org.apache.storm.daemon.common.Assignment.
+      (.get_master_code_dir assignment)
+      (into {} (.get_node_host assignment))
+      (clojurify-executor->node_port (into {} (.get_executor_node_port 
assignment)))
+      (map-key (fn [executor] (into [] executor))
+        (into {} (.get_executor_start_time_secs assignment)))
+      (clojurify-worker->resources (into {} (.get_worker_resources 
assignment))))))
+
+(defn convert-to-symbol-from-status [status]
+  (condp = status
+    TopologyStatus/ACTIVE {:type :active}
+    TopologyStatus/INACTIVE {:type :inactive}
+    TopologyStatus/REBALANCING {:type :rebalancing}
+    TopologyStatus/KILLED {:type :killed}
+    nil))
+
+(defn- convert-to-status-from-symbol [status]
+  (if status
+    (condp = (:type status)
+      :active TopologyStatus/ACTIVE
+      :inactive TopologyStatus/INACTIVE
+      :rebalancing TopologyStatus/REBALANCING
+      :killed TopologyStatus/KILLED
+      nil)))
+
+(defn clojurify-rebalance-options [^RebalanceOptions rebalance-options]
+  (-> {:action :rebalance}
+    (assoc-non-nil :delay-secs (if (.is_set_wait_secs rebalance-options) 
(.get_wait_secs rebalance-options)))
+    (assoc-non-nil :num-workers (if (.is_set_num_workers rebalance-options) 
(.get_num_workers rebalance-options)))
+    (assoc-non-nil :component->executors (if (.is_set_num_executors 
rebalance-options) (into {} (.get_num_executors rebalance-options))))))
+
+(defn thriftify-rebalance-options [rebalance-options]
+  (if rebalance-options
+    (let [thrift-rebalance-options (RebalanceOptions.)]
+      (if (:delay-secs rebalance-options)
+        (.set_wait_secs thrift-rebalance-options (int (:delay-secs 
rebalance-options))))
+      (if (:num-workers rebalance-options)
+        (.set_num_workers thrift-rebalance-options (int (:num-workers 
rebalance-options))))
+      (if (:component->executors rebalance-options)
+        (.set_num_executors thrift-rebalance-options (map-val int 
(:component->executors rebalance-options))))
+      thrift-rebalance-options)))
+
+(defn clojurify-kill-options [^KillOptions kill-options]
+  (-> {:action :kill}
+    (assoc-non-nil :delay-secs (if (.is_set_wait_secs kill-options) 
(.get_wait_secs kill-options)))))
+
+(defn thriftify-kill-options [kill-options]
+  (if kill-options
+    (let [thrift-kill-options (KillOptions.)]
+      (if (:delay-secs kill-options)
+        (.set_wait_secs thrift-kill-options (int (:delay-secs kill-options))))
+      thrift-kill-options)))
+
+(defn thriftify-topology-action-options [storm-base]
+  (if (:topology-action-options storm-base)
+    (let [ topology-action-options (:topology-action-options storm-base)
+           action (:action topology-action-options)
+           thrift-topology-action-options (TopologyActionOptions.)]
+      (if (= action :kill)
+        (.set_kill_options thrift-topology-action-options 
(thriftify-kill-options topology-action-options)))
+      (if (= action :rebalance)
+        (.set_rebalance_options thrift-topology-action-options 
(thriftify-rebalance-options topology-action-options)))
+      thrift-topology-action-options)))
+
+(defn clojurify-topology-action-options [^TopologyActionOptions 
topology-action-options]
+  (if topology-action-options
+    (or (and (.is_set_kill_options topology-action-options)
+             (clojurify-kill-options
+               (.get_kill_options topology-action-options)))
+        (and (.is_set_rebalance_options topology-action-options)
+             (clojurify-rebalance-options
+               (.get_rebalance_options topology-action-options))))))
+
+(defn clojurify-debugoptions [^DebugOptions options]
+  (if options
+    {
+      :enable (.is_enable options)
+      :samplingpct (.get_samplingpct options)
+      }
+    ))
+
+(defn thriftify-debugoptions [options]
+  (doto (DebugOptions.)
+    (.set_enable (get options :enable false))
+    (.set_samplingpct (get options :samplingpct 10))))
+
+(defn thriftify-storm-base [storm-base]
+  (doto (StormBase.)
+    (.set_name (:storm-name storm-base))
+    (.set_launch_time_secs (int (:launch-time-secs storm-base)))
+    (.set_status (convert-to-status-from-symbol (:status storm-base)))
+    (.set_num_workers (int (:num-workers storm-base)))
+    (.set_component_executors (map-val int (:component->executors storm-base)))
+    (.set_owner (:owner storm-base))
+    (.set_topology_action_options (thriftify-topology-action-options 
storm-base))
+    (.set_prev_status (convert-to-status-from-symbol (:prev-status 
storm-base)))
+    (.set_component_debug (map-val thriftify-debugoptions (:component->debug 
storm-base)))))
+
+(defn clojurify-storm-base [^StormBase storm-base]
+  (if storm-base
+    (org.apache.storm.daemon.common.StormBase.
+      (.get_name storm-base)
+      (.get_launch_time_secs storm-base)
+      (convert-to-symbol-from-status (.get_status storm-base))
+      (.get_num_workers storm-base)
+      (into {} (.get_component_executors storm-base))
+      (.get_owner storm-base)
+      (clojurify-topology-action-options (.get_topology_action_options 
storm-base))
+      (convert-to-symbol-from-status (.get_prev_status storm-base))
+      (map-val clojurify-debugoptions (.get_component_debug storm-base)))))
+
+(defn thriftify-stats [stats]
+  (if stats
+    (map-val thriftify-executor-stats
+      (map-key #(ExecutorInfo. (int (first %1)) (int (last %1)))
+        stats))
+    {}))
+
+(defn clojurify-stats [stats]
+  (if stats
+    (map-val clojurify-executor-stats
+      (map-key (fn [x] (list (.get_task_start x) (.get_task_end x)))
+        stats))
+    {}))
+
+(defn clojurify-zk-worker-hb [^ClusterWorkerHeartbeat worker-hb]
+  (if worker-hb
+    {:storm-id (.get_storm_id worker-hb)
+     :executor-stats (clojurify-stats (into {} (.get_executor_stats 
worker-hb)))
+     :uptime (.get_uptime_secs worker-hb)
+     :time-secs (.get_time_secs worker-hb)
+     }
+    {}))
+
+(defn thriftify-zk-worker-hb [worker-hb]
+  (if (not-empty (filter second (:executor-stats worker-hb)))
+    (doto (ClusterWorkerHeartbeat.)
+      (.set_uptime_secs (:uptime worker-hb))
+      (.set_storm_id (:storm-id worker-hb))
+      (.set_executor_stats (thriftify-stats (filter second (:executor-stats 
worker-hb))))
+      (.set_time_secs (:time-secs worker-hb)))))
+
+(defn clojurify-error [^ErrorInfo error]
+  (if error
+    {
+      :error (.get_error error)
+      :time-secs (.get_error_time_secs error)
+      :host (.get_host error)
+      :port (.get_port error)
+      }
+    ))
+
+(defn thriftify-error [error]
+  (doto (ErrorInfo. (:error error) (:time-secs error))
+    (.set_host (:host error))
+    (.set_port (:port error))))
+
+(defn clojurify-profile-request
+  [^ProfileRequest request]
+  (when request
+    {:host (.get_node (.get_nodeInfo request))
+     :port (first (.get_port (.get_nodeInfo request)))
+     :action     (.get_action request)
+     :timestamp  (.get_time_stamp request)}))
+
+(defn thriftify-profile-request
+  [profile-request]
+  (let [nodeinfo (doto (NodeInfo.)
+                   (.set_node (:host profile-request))
+                   (.set_port (set [(:port profile-request)])))
+        request (ProfileRequest. nodeinfo (:action profile-request))]
+    (.set_time_stamp request (:timestamp profile-request))
+    request))
+
+(defn thriftify-credentials [credentials]
+    (doto (Credentials.)
+      (.set_creds (if credentials credentials {}))))
+
+(defn clojurify-crdentials [^Credentials credentials]
+  (if credentials
+    (into {} (.get_creds credentials))
+    nil
+    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj 
b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
new file mode 100644
index 0000000..7c4d614
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
@@ -0,0 +1,107 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.acker
+  (:import [org.apache.storm.task OutputCollector TopologyContext IBolt])
+  (:import [org.apache.storm.tuple Tuple Fields])
+  (:import [org.apache.storm.utils RotatingMap MutableObject])
+  (:import [java.util List Map])
+  (:import [org.apache.storm Constants])
+  (:use [org.apache.storm config util log])
+  (:gen-class
+   :init init
+   :implements [org.apache.storm.task.IBolt]
+   :constructors {[] []}
+   :state state ))
+
+(def ACKER-COMPONENT-ID "__acker")
+(def ACKER-INIT-STREAM-ID "__ack_init")
+(def ACKER-ACK-STREAM-ID "__ack_ack")
+(def ACKER-FAIL-STREAM-ID "__ack_fail")
+
+(defn- update-ack [curr-entry val]
+  (let [old (get curr-entry :val 0)]
+    (assoc curr-entry :val (bit-xor old val))
+    ))
+
+(defn- acker-emit-direct [^OutputCollector collector ^Integer task ^String 
stream ^List values]
+  (.emitDirect collector task stream values)
+  )
+
+(defn mk-acker-bolt []
+  (let [output-collector (MutableObject.)
+        pending (MutableObject.)]
+    (reify IBolt
+      (^void prepare [this ^Map storm-conf ^TopologyContext context 
^OutputCollector collector]
+               (.setObject output-collector collector)
+               (.setObject pending (RotatingMap. 2))
+               )
+      (^void execute [this ^Tuple tuple]
+             (let [^RotatingMap pending (.getObject pending)
+                   stream-id (.getSourceStreamId tuple)]
+               (if (= stream-id Constants/SYSTEM_TICK_STREAM_ID)
+                 (.rotate pending)
+                 (let [id (.getValue tuple 0)
+                       ^OutputCollector output-collector (.getObject 
output-collector)
+                       curr (.get pending id)
+                       curr (condp = stream-id
+                                ACKER-INIT-STREAM-ID (-> curr
+                                                         (update-ack 
(.getValue tuple 1))
+                                                         (assoc :spout-task 
(.getValue tuple 2)))
+                                ACKER-ACK-STREAM-ID (update-ack curr 
(.getValue tuple 1))
+                                ACKER-FAIL-STREAM-ID (assoc curr :failed 
true))]
+                   (.put pending id curr)
+                   (when (and curr (:spout-task curr))
+                     (cond (= 0 (:val curr))
+                           (do
+                             (.remove pending id)
+                             (acker-emit-direct output-collector
+                                                (:spout-task curr)
+                                                ACKER-ACK-STREAM-ID
+                                                [id]
+                                                ))
+                           (:failed curr)
+                           (do
+                             (.remove pending id)
+                             (acker-emit-direct output-collector
+                                                (:spout-task curr)
+                                                ACKER-FAIL-STREAM-ID
+                                                [id]
+                                                ))
+                           ))
+                   (.ack output-collector tuple)
+                   ))))
+      (^void cleanup [this]
+        )
+      )))
+
+(defn -init []
+  [[] (container)])
+
+(defn -prepare [this conf context collector]
+  (let [^IBolt ret (mk-acker-bolt)]
+    (container-set! (.state ^org.apache.storm.daemon.acker this) ret)
+    (.prepare ret conf context collector)
+    ))
+
+(defn -execute [this tuple]
+  (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker 
this))]
+    (.execute delegate tuple)
+    ))
+
+(defn -cleanup [this]
+  (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker 
this))]
+    (.cleanup delegate)
+    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj 
b/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj
new file mode 100644
index 0000000..14d0132
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj
@@ -0,0 +1,98 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.builtin-metrics
+  (:import [org.apache.storm.metric.api CountMetric StateMetric IMetric 
IStatefulObject])
+  (:import [org.apache.storm.metric.internal MultiCountStatAndMetric 
MultiLatencyStatAndMetric])
+  (:import [org.apache.storm Config])
+  (:use [org.apache.storm.stats]))
+
+(defrecord BuiltinSpoutMetrics [^MultiCountStatAndMetric ack-count
+                                ^MultiLatencyStatAndMetric complete-latency
+                                ^MultiCountStatAndMetric fail-count
+                                ^MultiCountStatAndMetric emit-count
+                                ^MultiCountStatAndMetric transfer-count])
+(defrecord BuiltinBoltMetrics [^MultiCountStatAndMetric ack-count
+                               ^MultiLatencyStatAndMetric process-latency
+                               ^MultiCountStatAndMetric fail-count
+                               ^MultiCountStatAndMetric execute-count
+                               ^MultiLatencyStatAndMetric execute-latency
+                               ^MultiCountStatAndMetric emit-count
+                               ^MultiCountStatAndMetric transfer-count])
+(defrecord SpoutThrottlingMetrics [^CountMetric skipped-max-spout
+                                   ^CountMetric skipped-throttle
+                                   ^CountMetric skipped-inactive])
+
+
+(defn make-data [executor-type stats]
+  (condp = executor-type
+    :spout (BuiltinSpoutMetrics. (stats-acked stats)
+                                 (stats-complete-latencies stats)
+                                 (stats-failed stats)
+                                 (stats-emitted stats)
+                                 (stats-transferred stats))
+    :bolt (BuiltinBoltMetrics. (stats-acked stats)
+                               (stats-process-latencies stats)
+                               (stats-failed stats)
+                               (stats-executed stats)
+                               (stats-execute-latencies stats)
+                               (stats-emitted stats)
+                               (stats-transferred stats))))
+
+(defn make-spout-throttling-data []
+  (SpoutThrottlingMetrics. (CountMetric.)
+                           (CountMetric.)
+                           (CountMetric.)))
+
+(defn register-spout-throttling-metrics [throttling-metrics  storm-conf 
topology-context]
+  (doseq [[kw imetric] throttling-metrics]
+    (.registerMetric topology-context (str "__" (name kw)) imetric
+                     (int (get storm-conf 
Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
+
+(defn register-all [builtin-metrics  storm-conf topology-context]
+  (doseq [[kw imetric] builtin-metrics]
+    (.registerMetric topology-context (str "__" (name kw)) imetric
+                     (int (get storm-conf 
Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
+
+(defn register-iconnection-server-metric [server storm-conf topology-context]
+  (if (instance? IStatefulObject server)
+    (.registerMetric topology-context "__recv-iconnection" (StateMetric. 
server)
+                     (int (get storm-conf 
Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
+
+(defn register-iconnection-client-metrics [node+port->socket-ref storm-conf 
topology-context]
+  (.registerMetric topology-context "__send-iconnection"
+    (reify IMetric
+      (^Object getValueAndReset [this]
+        (into {}
+          (map
+            (fn [[node+port ^IStatefulObject connection]] [node+port 
(.getState connection)])
+            (filter 
+              (fn [[node+port connection]] (instance? IStatefulObject 
connection))
+              @node+port->socket-ref)))))
+    (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS))))
+ 
+(defn register-queue-metrics [queues storm-conf topology-context]
+  (doseq [[qname q] queues]
+    (.registerMetric topology-context (str "__" (name qname)) (StateMetric. q)
+                     (int (get storm-conf 
Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
+
+(defn skipped-max-spout! [^SpoutThrottlingMetrics m stats]
+  (-> m .skipped-max-spout (.incrBy (stats-rate stats))))
+
+(defn skipped-throttle! [^SpoutThrottlingMetrics m stats]
+  (-> m .skipped-throttle (.incrBy (stats-rate stats))))
+
+(defn skipped-inactive! [^SpoutThrottlingMetrics m stats]
+  (-> m .skipped-inactive (.incrBy (stats-rate stats))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj 
b/storm-core/src/clj/org/apache/storm/daemon/common.clj
new file mode 100644
index 0000000..dd761a5
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -0,0 +1,402 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.common
+  (:use [org.apache.storm log config util])
+  (:import [org.apache.storm.generated StormTopology
+            InvalidTopologyException GlobalStreamId]
+           [org.apache.storm.utils ThriftTopologyUtils])
+  (:import [org.apache.storm.utils Utils])
+  (:import [org.apache.storm.task WorkerTopologyContext])
+  (:import [org.apache.storm Constants])
+  (:import [org.apache.storm.metric SystemBolt])
+  (:import [org.apache.storm.metric EventLoggerBolt])
+  (:import [org.apache.storm.security.auth IAuthorizer]) 
+  (:import [java.io InterruptedIOException])
+  (:require [clojure.set :as set])  
+  (:require [org.apache.storm.daemon.acker :as acker])
+  (:require [org.apache.storm.thrift :as thrift])
+  (:require [metrics.reporters.jmx :as jmx]))
+
+(defn start-metrics-reporters []
+  (jmx/start (jmx/reporter {})))
+
+(def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID)
+(def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID)
+(def ACKER-ACK-STREAM-ID acker/ACKER-ACK-STREAM-ID)
+(def ACKER-FAIL-STREAM-ID acker/ACKER-FAIL-STREAM-ID)
+
+(def SYSTEM-STREAM-ID "__system")
+
+(def EVENTLOGGER-COMPONENT-ID "__eventlogger")
+(def EVENTLOGGER-STREAM-ID "__eventlog")
+
+(def SYSTEM-COMPONENT-ID Constants/SYSTEM_COMPONENT_ID)
+(def SYSTEM-TICK-STREAM-ID Constants/SYSTEM_TICK_STREAM_ID)
+(def METRICS-STREAM-ID Constants/METRICS_STREAM_ID)
+(def METRICS-TICK-STREAM-ID Constants/METRICS_TICK_STREAM_ID)
+(def CREDENTIALS-CHANGED-STREAM-ID Constants/CREDENTIALS_CHANGED_STREAM_ID)
+
+;; the task id is the virtual port
+;; node->host is here so that tasks know who to talk to just from assignment
+;; this avoid situation where node goes down and task doesn't know what to do 
information-wise
+(defrecord Assignment [master-code-dir node->host executor->node+port 
executor->start-time-secs worker->resources])
+
+
+;; component->executors is a map from spout/bolt id to number of executors for 
that component
+(defrecord StormBase [storm-name launch-time-secs status num-workers 
component->executors owner topology-action-options prev-status 
component->debug])
+
+(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta 
scheduler-meta uptime-secs version resources-map])
+
+(defprotocol DaemonCommon
+  (waiting? [this]))
+
+(defrecord ExecutorStats [^long processed
+                          ^long acked
+                          ^long emitted
+                          ^long transferred
+                          ^long failed])
+
+(defn new-executor-stats []
+  (ExecutorStats. 0 0 0 0 0))
+
+(defn get-storm-id [storm-cluster-state storm-name]
+  (let [active-storms (.active-storms storm-cluster-state)]
+    (find-first
+      #(= storm-name (:storm-name (.storm-base storm-cluster-state % nil)))
+      active-storms)
+    ))
+
+(defn topology-bases [storm-cluster-state]
+  (let [active-topologies (.active-storms storm-cluster-state)]
+    (into {} 
+          (dofor [id active-topologies]
+                 [id (.storm-base storm-cluster-state id nil)]
+                 ))
+    ))
+
+(defn validate-distributed-mode! [conf]
+  (if (local-mode? conf)
+      (throw
+        (IllegalArgumentException. "Cannot start server in local mode!"))))
+
+(defmacro defserverfn [name & body]
+  `(let [exec-fn# (fn ~@body)]
+    (defn ~name [& args#]
+      (try-cause
+        (apply exec-fn# args#)
+      (catch InterruptedIOException e#
+        (throw e#))
+      (catch InterruptedException e#
+        (throw e#))
+      (catch Throwable t#
+        (log-error t# "Error on initialization of server " ~(str name))
+        (exit-process! 13 "Error on initialization")
+        )))))
+
+(defn- validate-ids! [^StormTopology topology]
+  (let [sets (map #(.getFieldValue topology %) thrift/STORM-TOPOLOGY-FIELDS)
+        offending (apply any-intersection sets)]
+    (if-not (empty? offending)
+      (throw (InvalidTopologyException.
+              (str "Duplicate component ids: " offending))))
+    (doseq [f thrift/STORM-TOPOLOGY-FIELDS
+            :let [obj-map (.getFieldValue topology f)]]
+      (if-not (ThriftTopologyUtils/isWorkerHook f)
+        (do
+          (doseq [id (keys obj-map)]
+            (if (Utils/isSystemId id)
+              (throw (InvalidTopologyException.
+                       (str id " is not a valid component id")))))
+          (doseq [obj (vals obj-map)
+                  id (-> obj .get_common .get_streams keys)]
+            (if (Utils/isSystemId id)
+              (throw (InvalidTopologyException.
+                       (str id " is not a valid stream id"))))))))))
+
+(defn all-components [^StormTopology topology]
+  (apply merge {}
+    (for [f thrift/STORM-TOPOLOGY-FIELDS]
+      (if-not (ThriftTopologyUtils/isWorkerHook f)
+        (.getFieldValue topology f)))))
+
+(defn component-conf [component]
+  (->> component
+      .get_common
+      .get_json_conf
+      from-json))
+
+(defn validate-basic! [^StormTopology topology]
+  (validate-ids! topology)
+  (doseq [f thrift/SPOUT-FIELDS
+          obj (->> f (.getFieldValue topology) vals)]
+    (if-not (empty? (-> obj .get_common .get_inputs))
+      (throw (InvalidTopologyException. "May not declare inputs for a 
spout"))))
+  (doseq [[comp-id comp] (all-components topology)
+          :let [conf (component-conf comp)
+                p (-> comp .get_common thrift/parallelism-hint)]]
+    (when (and (> (conf TOPOLOGY-TASKS) 0)
+               p
+               (<= p 0))
+      (throw (InvalidTopologyException. "Number of executors must be greater 
than 0 when number of tasks is greater than 0"))
+      )))
+
+(defn validate-structure! [^StormTopology topology]
+  ;; validate all the component subscribe from component+stream which actually 
exists in the topology
+  ;; and if it is a fields grouping, validate the corresponding field exists  
+  (let [all-components (all-components topology)]
+    (doseq [[id comp] all-components
+            :let [inputs (.. comp get_common get_inputs)]]
+      (doseq [[global-stream-id grouping] inputs
+              :let [source-component-id (.get_componentId global-stream-id)
+                    source-stream-id    (.get_streamId global-stream-id)]]
+        (if-not (contains? all-components source-component-id)
+          (throw (InvalidTopologyException. (str "Component: [" id "] 
subscribes from non-existent component [" source-component-id "]")))
+          (let [source-streams (-> all-components (get source-component-id) 
.get_common .get_streams)]
+            (if-not (contains? source-streams source-stream-id)
+              (throw (InvalidTopologyException. (str "Component: [" id "] 
subscribes from non-existent stream: [" source-stream-id "] of component [" 
source-component-id "]")))
+              (if (= :fields (thrift/grouping-type grouping))
+                (let [grouping-fields (set (.get_fields grouping))
+                      source-stream-fields (-> source-streams (get 
source-stream-id) .get_output_fields set)
+                      diff-fields (set/difference grouping-fields 
source-stream-fields)]
+                  (when-not (empty? diff-fields)
+                    (throw (InvalidTopologyException. (str "Component: [" id 
"] subscribes from stream: [" source-stream-id "] of component [" 
source-component-id "] with non-existent fields: " diff-fields)))))))))))))
+
+(defn acker-inputs [^StormTopology topology]
+  (let [bolt-ids (.. topology get_bolts keySet)
+        spout-ids (.. topology get_spouts keySet)
+        spout-inputs (apply merge
+                            (for [id spout-ids]
+                              {[id ACKER-INIT-STREAM-ID] ["id"]}
+                              ))
+        bolt-inputs (apply merge
+                           (for [id bolt-ids]
+                             {[id ACKER-ACK-STREAM-ID] ["id"]
+                              [id ACKER-FAIL-STREAM-ID] ["id"]}
+                             ))]
+    (merge spout-inputs bolt-inputs)))
+
+;; the event logger receives inputs from all the spouts and bolts
+;; with a field grouping on component id so that all tuples from a component
+;; goes to same executor and can be viewed via logviewer.
+(defn eventlogger-inputs [^StormTopology topology]
+  (let [bolt-ids (.. topology get_bolts keySet)
+        spout-ids (.. topology get_spouts keySet)
+        spout-inputs (apply merge
+                       (for [id spout-ids]
+                         {[id EVENTLOGGER-STREAM-ID] ["component-id"]}
+                         ))
+        bolt-inputs (apply merge
+                      (for [id bolt-ids]
+                        {[id EVENTLOGGER-STREAM-ID] ["component-id"]}
+                        ))]
+    (merge spout-inputs bolt-inputs)))
+
+(defn add-acker! [storm-conf ^StormTopology ret]
+  (let [num-executors (if (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) 
(storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-ACKER-EXECUTORS))
+        acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret)
+                                         (new org.apache.storm.daemon.acker)
+                                         {ACKER-ACK-STREAM-ID 
(thrift/direct-output-fields ["id"])
+                                          ACKER-FAIL-STREAM-ID 
(thrift/direct-output-fields ["id"])
+                                          }
+                                         :p num-executors
+                                         :conf {TOPOLOGY-TASKS num-executors
+                                                TOPOLOGY-TICK-TUPLE-FREQ-SECS 
(storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
+    (dofor [[_ bolt] (.get_bolts ret)
+            :let [common (.get_common bolt)]]
+           (do
+             (.put_to_streams common ACKER-ACK-STREAM-ID (thrift/output-fields 
["id" "ack-val"]))
+             (.put_to_streams common ACKER-FAIL-STREAM-ID 
(thrift/output-fields ["id"]))
+             ))
+    (dofor [[_ spout] (.get_spouts ret)
+            :let [common (.get_common spout)
+                  spout-conf (merge
+                               (component-conf spout)
+                               {TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf 
TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]]
+      (do
+        ;; this set up tick tuples to cause timeouts to be triggered
+        (.set_json_conf common (to-json spout-conf))
+        (.put_to_streams common ACKER-INIT-STREAM-ID (thrift/output-fields 
["id" "init-val" "spout-task"]))
+        (.put_to_inputs common
+                        (GlobalStreamId. ACKER-COMPONENT-ID 
ACKER-ACK-STREAM-ID)
+                        (thrift/mk-direct-grouping))
+        (.put_to_inputs common
+                        (GlobalStreamId. ACKER-COMPONENT-ID 
ACKER-FAIL-STREAM-ID)
+                        (thrift/mk-direct-grouping))
+        ))
+    (.put_to_bolts ret "__acker" acker-bolt)
+    ))
+
+(defn add-metric-streams! [^StormTopology topology]
+  (doseq [[_ component] (all-components topology)
+          :let [common (.get_common component)]]
+    (.put_to_streams common METRICS-STREAM-ID
+                     (thrift/output-fields ["task-info" "data-points"]))))
+
+(defn add-system-streams! [^StormTopology topology]
+  (doseq [[_ component] (all-components topology)
+          :let [common (.get_common component)]]
+    (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields 
["event"]))))
+
+
+(defn map-occurrences [afn coll]
+  (->> coll
+       (reduce (fn [[counts new-coll] x]
+                 (let [occurs (inc (get counts x 0))]
+                   [(assoc counts x occurs) (cons (afn x occurs) new-coll)]))
+               [{} []])
+       (second)
+       (reverse)))
+
+(defn number-duplicates
+  "(number-duplicates [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
+  [coll]
+  (map-occurrences (fn [x occurences] (if (>= occurences 2) (str x "#" 
occurences) x)) coll))
+
+(defn metrics-consumer-register-ids
+  "Generates a list of component ids for each metrics consumer
+   e.g. [\"__metrics_org.mycompany.MyMetricsConsumer\", ..] "
+  [storm-conf]
+  (->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)         
+       (map #(get % "class"))
+       (number-duplicates)
+       (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %))))
+
+(defn metrics-consumer-bolt-specs [storm-conf topology]
+  (let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys 
(all-components topology)))
+        inputs (->> (for [comp-id component-ids-that-emit-metrics]
+                      {[comp-id METRICS-STREAM-ID] :shuffle})
+                    (into {}))
+        
+        mk-bolt-spec (fn [class arg p]
+                       (thrift/mk-bolt-spec*
+                        inputs
+                        (org.apache.storm.metric.MetricsConsumerBolt. class 
arg)
+                        {} :p p :conf {TOPOLOGY-TASKS p}))]
+    
+    (map
+     (fn [component-id register]           
+       [component-id (mk-bolt-spec (get register "class")
+                                   (get register "argument")
+                                   (or (get register "parallelism.hint") 1))])
+     
+     (metrics-consumer-register-ids storm-conf)
+     (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))
+
+;; return the fields that event logger bolt expects
+(defn eventlogger-bolt-fields []
+  [(EventLoggerBolt/FIELD_COMPONENT_ID) (EventLoggerBolt/FIELD_MESSAGE_ID)  
(EventLoggerBolt/FIELD_TS) (EventLoggerBolt/FIELD_VALUES)]
+  )
+
+(defn add-eventlogger! [storm-conf ^StormTopology ret]
+  (let [num-executors (if (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) 
(storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS))
+        eventlogger-bolt (thrift/mk-bolt-spec* (eventlogger-inputs ret)
+                     (EventLoggerBolt.)
+                     {}
+                     :p num-executors
+                     :conf {TOPOLOGY-TASKS num-executors
+                            TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf 
TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
+
+    (doseq [[_ component] (all-components ret)
+            :let [common (.get_common component)]]
+      (.put_to_streams common EVENTLOGGER-STREAM-ID (thrift/output-fields 
(eventlogger-bolt-fields))))
+    (.put_to_bolts ret EVENTLOGGER-COMPONENT-ID eventlogger-bolt)
+    ))
+
+(defn add-metric-components! [storm-conf ^StormTopology topology]  
+  (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf 
topology)]
+    (.put_to_bolts topology comp-id bolt-spec)))
+
+(defn add-system-components! [conf ^StormTopology topology]
+  (let [system-bolt-spec (thrift/mk-bolt-spec*
+                          {}
+                          (SystemBolt.)
+                          {SYSTEM-TICK-STREAM-ID (thrift/output-fields 
["rate_secs"])
+                           METRICS-TICK-STREAM-ID (thrift/output-fields 
["interval"])
+                           CREDENTIALS-CHANGED-STREAM-ID (thrift/output-fields 
["creds"])}
+                          :p 0
+                          :conf {TOPOLOGY-TASKS 0})]
+    (.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec)))
+
+(defn system-topology! [storm-conf ^StormTopology topology]
+  (validate-basic! topology)
+  (let [ret (.deepCopy topology)]
+    (add-acker! storm-conf ret)
+    (add-eventlogger! storm-conf ret)
+    (add-metric-components! storm-conf ret)
+    (add-system-components! storm-conf ret)
+    (add-metric-streams! ret)
+    (add-system-streams! ret)
+    (validate-structure! ret)
+    ret
+    ))
+
+(defn has-ackers? [storm-conf]
+  (or (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (> (storm-conf 
TOPOLOGY-ACKER-EXECUTORS) 0)))
+
+(defn has-eventloggers? [storm-conf]
+  (or (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) (> (storm-conf 
TOPOLOGY-EVENTLOGGER-EXECUTORS) 0)))
+
+(defn num-start-executors [component]
+  (thrift/parallelism-hint (.get_common component)))
+
+(defn storm-task-info
+  "Returns map from task -> component id"
+  [^StormTopology user-topology storm-conf]
+  (->> (system-topology! storm-conf user-topology)
+       all-components
+       (map-val (comp #(get % TOPOLOGY-TASKS) component-conf))
+       (sort-by first)
+       (mapcat (fn [[c num-tasks]] (repeat num-tasks c)))
+       (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1)))
+       (into {})
+       ))
+
+(defn executor-id->tasks [[first-task-id last-task-id]]
+  (->> (range first-task-id (inc last-task-id))
+       (map int)))
+
+(defn worker-context [worker]
+  (WorkerTopologyContext. (:system-topology worker)
+                          (:storm-conf worker)
+                          (:task->component worker)
+                          (:component->sorted-tasks worker)
+                          (:component->stream->fields worker)
+                          (:storm-id worker)
+                          (supervisor-storm-resources-path
+                            (supervisor-stormdist-root (:conf worker) 
(:storm-id worker)))
+                          (worker-pids-root (:conf worker) (:worker-id worker))
+                          (:port worker)
+                          (:task-ids worker)
+                          (:default-shared-resources worker)
+                          (:user-shared-resources worker)
+                          ))
+
+
+(defn to-task->node+port [executor->node+port]
+  (->> executor->node+port
+       (mapcat (fn [[e node+port]] (for [t (executor-id->tasks e)] [t 
node+port])))
+       (into {})))
+
+(defn mk-authorization-handler [klassname conf]
+  (let [aznClass (if klassname (Class/forName klassname))
+        aznHandler (if aznClass (.newInstance aznClass))] 
+    (if aznHandler (.prepare ^IAuthorizer aznHandler conf))
+    (log-debug "authorization class name:" klassname
+                 " class:" aznClass
+                 " handler:" aznHandler)
+    aznHandler
+  )) 
+

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj 
b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
new file mode 100644
index 0000000..d6f77c3
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
@@ -0,0 +1,274 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.daemon.drpc
+  (:import [org.apache.storm.security.auth AuthUtils ThriftServer 
ThriftConnectionType ReqContext])
+  (:import [org.apache.storm.security.auth.authorizer DRPCAuthorizerBase])
+  (:import [org.apache.storm.generated DistributedRPC DistributedRPC$Iface 
DistributedRPC$Processor
+            DRPCRequest DRPCExecutionException DistributedRPCInvocations 
DistributedRPCInvocations$Iface
+            DistributedRPCInvocations$Processor])
+  (:import [java.util.concurrent Semaphore ConcurrentLinkedQueue
+            ThreadPoolExecutor ArrayBlockingQueue TimeUnit])
+  (:import [org.apache.storm.daemon Shutdownable])
+  (:import [java.net InetAddress])
+  (:import [org.apache.storm.generated AuthorizationException]
+           [org.apache.storm.utils VersionInfo])
+  (:use [org.apache.storm config log util])
+  (:use [org.apache.storm.daemon common])
+  (:use [org.apache.storm.ui helpers])
+  (:use compojure.core)
+  (:use ring.middleware.reload)
+  (:require [compojure.handler :as handler])
+  (:require [metrics.meters :refer [defmeter mark!]])
+  (:gen-class))
+
+(defmeter drpc:num-execute-http-requests)
+(defmeter drpc:num-execute-calls)
+(defmeter drpc:num-result-calls)
+(defmeter drpc:num-failRequest-calls)
+(defmeter drpc:num-fetchRequest-calls)
+(defmeter drpc:num-shutdown-calls)
+
+(def STORM-VERSION (VersionInfo/getVersion))
+
+(defn timeout-check-secs [] 5)
+
+(defn acquire-queue [queues-atom function]
+  (swap! queues-atom
+    (fn [amap]
+      (if-not (amap function)
+        (assoc amap function (ConcurrentLinkedQueue.))
+        amap)))
+  (@queues-atom function))
+
+(defn check-authorization
+  ([aclHandler mapping operation context]
+    (if (not-nil? context)
+      (log-thrift-access (.requestID context) (.remoteAddress context) 
(.principal context) operation))
+    (if aclHandler
+      (let [context (or context (ReqContext/context))]
+        (if-not (.permit aclHandler context operation mapping)
+          (let [principal (.principal context)
+                user (if principal (.getName principal) "unknown")]
+              (throw (AuthorizationException.
+                       (str "DRPC request '" operation "' for '"
+                            user "' user is not authorized"))))))))
+  ([aclHandler mapping operation]
+    (check-authorization aclHandler mapping operation (ReqContext/context))))
+
+;; TODO: change this to use TimeCacheMap
+(defn service-handler [conf]
+  (let [drpc-acl-handler (mk-authorization-handler (conf DRPC-AUTHORIZER) conf)
+        ctr (atom 0)
+        id->sem (atom {})
+        id->result (atom {})
+        id->start (atom {})
+        id->function (atom {})
+        id->request (atom {})
+        request-queues (atom {})
+        cleanup (fn [id] (swap! id->sem dissoc id)
+                  (swap! id->result dissoc id)
+                  (swap! id->function dissoc id)
+                  (swap! id->request dissoc id)
+                  (swap! id->start dissoc id))
+        my-ip (.getHostAddress (InetAddress/getLocalHost))
+        clear-thread (async-loop
+                       (fn []
+                         (doseq [[id start] @id->start]
+                           (when (> (time-delta start) (conf 
DRPC-REQUEST-TIMEOUT-SECS))
+                             (when-let [sem (@id->sem id)]
+                               (.remove (acquire-queue request-queues 
(@id->function id)) (@id->request id))
+                               (log-warn "Timeout DRPC request id: " id " 
start at " start)
+                               (.release sem))
+                             (cleanup id)))
+                         (timeout-check-secs)))]
+    (reify DistributedRPC$Iface
+      (^String execute
+        [this ^String function ^String args]
+        (mark! drpc:num-execute-calls)
+        (log-debug "Received DRPC request for " function " (" args ") at " 
(System/currentTimeMillis))
+        (check-authorization drpc-acl-handler
+                             {DRPCAuthorizerBase/FUNCTION_NAME function}
+                             "execute")
+        (let [id (str (swap! ctr (fn [v] (mod (inc v) 1000000000))))
+              ^Semaphore sem (Semaphore. 0)
+              req (DRPCRequest. args id)
+              ^ConcurrentLinkedQueue queue (acquire-queue request-queues 
function)]
+          (swap! id->start assoc id (current-time-secs))
+          (swap! id->sem assoc id sem)
+          (swap! id->function assoc id function)
+          (swap! id->request assoc id req)
+          (.add queue req)
+          (log-debug "Waiting for DRPC result for " function " " args " at " 
(System/currentTimeMillis))
+          (.acquire sem)
+          (log-debug "Acquired DRPC result for " function " " args " at " 
(System/currentTimeMillis))
+          (let [result (@id->result id)]
+            (cleanup id)
+            (log-debug "Returning DRPC result for " function " " args " at " 
(System/currentTimeMillis))
+            (if (instance? DRPCExecutionException result)
+              (throw result)
+              (if (nil? result)
+                (throw (DRPCExecutionException. "Request timed out"))
+                result)))))
+
+      DistributedRPCInvocations$Iface
+
+      (^void result
+        [this ^String id ^String result]
+        (mark! drpc:num-result-calls)
+        (when-let [func (@id->function id)]
+          (check-authorization drpc-acl-handler
+                               {DRPCAuthorizerBase/FUNCTION_NAME func}
+                               "result")
+          (let [^Semaphore sem (@id->sem id)]
+            (log-debug "Received result " result " for " id " at " 
(System/currentTimeMillis))
+            (when sem
+              (swap! id->result assoc id result)
+              (.release sem)
+              ))))
+
+      (^void failRequest
+        [this ^String id]
+        (mark! drpc:num-failRequest-calls)
+        (when-let [func (@id->function id)]
+          (check-authorization drpc-acl-handler
+                               {DRPCAuthorizerBase/FUNCTION_NAME func}
+                               "failRequest")
+          (let [^Semaphore sem (@id->sem id)]
+            (when sem
+              (swap! id->result assoc id (DRPCExecutionException. "Request 
failed"))
+              (.release sem)))))
+
+      (^DRPCRequest fetchRequest
+        [this ^String func]
+        (mark! drpc:num-fetchRequest-calls)
+        (check-authorization drpc-acl-handler
+                             {DRPCAuthorizerBase/FUNCTION_NAME func}
+                             "fetchRequest")
+        (let [^ConcurrentLinkedQueue queue (acquire-queue request-queues func)
+              ret (.poll queue)]
+          (if ret
+            (do (log-debug "Fetched request for " func " at " 
(System/currentTimeMillis))
+              ret)
+            (DRPCRequest. "" ""))))
+
+      Shutdownable
+
+      (shutdown
+        [this]
+        (mark! drpc:num-shutdown-calls)
+        (.interrupt clear-thread)))))
+
+(defn handle-request [handler]
+  (fn [request]
+    (handler request)))
+
+(defn populate-context!
+  "Populate the Storm RequestContext from an servlet-request. This should be 
called in each handler"
+  [http-creds-handler servlet-request]
+    (when http-creds-handler
+      (.populateContext http-creds-handler (ReqContext/context) 
servlet-request)))
+
+(defn webapp [handler http-creds-handler]
+  (mark! drpc:num-execute-http-requests)
+  (->
+    (routes
+      (POST "/drpc/:func" [:as {:keys [body servlet-request]} func & m]
+        (let [args (slurp body)]
+          (populate-context! http-creds-handler servlet-request)
+          (.execute handler func args)))
+      (POST "/drpc/:func/" [:as {:keys [body servlet-request]} func & m]
+        (let [args (slurp body)]
+          (populate-context! http-creds-handler servlet-request)
+          (.execute handler func args)))
+      (GET "/drpc/:func/:args" [:as {:keys [servlet-request]} func args & m]
+          (populate-context! http-creds-handler servlet-request)
+          (.execute handler func args))
+      (GET "/drpc/:func/" [:as {:keys [servlet-request]} func & m]
+          (populate-context! http-creds-handler servlet-request)
+          (.execute handler func ""))
+      (GET "/drpc/:func" [:as {:keys [servlet-request]} func & m]
+          (populate-context! http-creds-handler servlet-request)
+          (.execute handler func "")))
+    (wrap-reload '[org.apache.storm.daemon.drpc])
+    handle-request))
+
+(defn launch-server!
+  ([]
+    (log-message "Starting drpc server for storm version '" STORM-VERSION "'")
+    (let [conf (read-storm-config)
+          worker-threads (int (conf DRPC-WORKER-THREADS))
+          queue-size (int (conf DRPC-QUEUE-SIZE))
+          drpc-http-port (int (conf DRPC-HTTP-PORT))
+          drpc-port (int (conf DRPC-PORT))
+          drpc-service-handler (service-handler conf)
+          ;; requests and returns need to be on separate thread pools, since 
calls to
+          ;; "execute" don't unblock until other thrift methods are called. So 
if
+          ;; 64 threads are calling execute, the server won't accept the result
+          ;; invocations that will unblock those threads
+          handler-server (when (> drpc-port 0)
+                           (ThriftServer. conf
+                             (DistributedRPC$Processor. drpc-service-handler)
+                             ThriftConnectionType/DRPC))
+          invoke-server (ThriftServer. conf
+                          (DistributedRPCInvocations$Processor. 
drpc-service-handler)
+                          ThriftConnectionType/DRPC_INVOCATIONS)
+          http-creds-handler (AuthUtils/GetDrpcHttpCredentialsPlugin conf)]
+      (add-shutdown-hook-with-force-kill-in-1-sec (fn []
+                                                    (if handler-server (.stop 
handler-server))
+                                                    (.stop invoke-server)))
+      (log-message "Starting Distributed RPC servers...")
+      (future (.serve invoke-server))
+      (when (> drpc-http-port 0)
+        (let [app (-> (webapp drpc-service-handler http-creds-handler)
+                    requests-middleware)
+              filter-class (conf DRPC-HTTP-FILTER)
+              filter-params (conf DRPC-HTTP-FILTER-PARAMS)
+              filters-confs [{:filter-class filter-class
+                              :filter-params filter-params}]
+              https-port (int (conf DRPC-HTTPS-PORT))
+              https-ks-path (conf DRPC-HTTPS-KEYSTORE-PATH)
+              https-ks-password (conf DRPC-HTTPS-KEYSTORE-PASSWORD)
+              https-ks-type (conf DRPC-HTTPS-KEYSTORE-TYPE)
+              https-key-password (conf DRPC-HTTPS-KEY-PASSWORD)
+              https-ts-path (conf DRPC-HTTPS-TRUSTSTORE-PATH)
+              https-ts-password (conf DRPC-HTTPS-TRUSTSTORE-PASSWORD)
+              https-ts-type (conf DRPC-HTTPS-TRUSTSTORE-TYPE)
+              https-want-client-auth (conf DRPC-HTTPS-WANT-CLIENT-AUTH)
+              https-need-client-auth (conf DRPC-HTTPS-NEED-CLIENT-AUTH)]
+
+          (storm-run-jetty
+           {:port drpc-http-port
+            :configurator (fn [server]
+                            (config-ssl server
+                                        https-port
+                                        https-ks-path
+                                        https-ks-password
+                                        https-ks-type
+                                        https-key-password
+                                        https-ts-path
+                                        https-ts-password
+                                        https-ts-type
+                                        https-need-client-auth
+                                        https-want-client-auth)
+                            (config-filter server app filters-confs))})))
+      (start-metrics-reporters)
+      (when handler-server
+        (.serve handler-server)))))
+
+(defn -main []
+  (setup-default-uncaught-exception-handler)
+  (launch-server!))

Reply via email to