http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/config_value.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/command/config_value.clj b/storm-core/src/clj/backtype/storm/command/config_value.clj deleted file mode 100644 index 1d193a2..0000000 --- a/storm-core/src/clj/backtype/storm/command/config_value.clj +++ /dev/null @@ -1,24 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.command.config-value - (:use [backtype.storm config log]) - (:gen-class)) - - -(defn -main [^String name] - (let [conf (read-storm-config)] - (println "VALUE:" (conf name)) - ))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/deactivate.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/command/deactivate.clj b/storm-core/src/clj/backtype/storm/command/deactivate.clj deleted file mode 100644 index 1a614de..0000000 --- a/storm-core/src/clj/backtype/storm/command/deactivate.clj +++ /dev/null @@ -1,24 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.command.deactivate - (:use [backtype.storm thrift log]) - (:gen-class)) - -(defn -main [name] - (with-configured-nimbus-connection nimbus - (.deactivate nimbus name) - (log-message "Deactivated topology: " name) - )) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/dev_zookeeper.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/command/dev_zookeeper.clj b/storm-core/src/clj/backtype/storm/command/dev_zookeeper.clj deleted file mode 100644 index d90e72a..0000000 --- a/storm-core/src/clj/backtype/storm/command/dev_zookeeper.clj +++ /dev/null @@ -1,26 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.command.dev-zookeeper - (:use [backtype.storm zookeeper util config]) - (:gen-class)) - -(defn -main [& args] - (let [conf (read-storm-config) - port (conf STORM-ZOOKEEPER-PORT) - localpath (conf DEV-ZOOKEEPER-PATH)] - (rmr localpath) - (mk-inprocess-zookeeper localpath :port port) - )) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/get_errors.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/command/get_errors.clj b/storm-core/src/clj/backtype/storm/command/get_errors.clj deleted file mode 100644 index 60707b2..0000000 --- a/storm-core/src/clj/backtype/storm/command/get_errors.clj +++ /dev/null @@ -1,52 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.command.get-errors - (:use [clojure.tools.cli :only [cli]]) - (:use [backtype.storm thrift log]) - (:use [backtype.storm util]) - (:require [backtype.storm.daemon - [nimbus :as nimbus] - [common :as common]]) - (:import [backtype.storm.generated GetInfoOptions NumErrorsChoice - TopologySummary ErrorInfo]) - (:gen-class)) - -(defn get-topology-id [name topologies] - (let [topology (first (filter #(= (.get_name %1) name) topologies))] - (when (not-nil? topology) (.get_id topology)))) - -(defn get-component-errors - [topology-errors] - (apply hash-map (remove nil? - (flatten (for [[comp-name comp-errors] topology-errors] - (let [latest-error (when (not (empty? comp-errors)) (first comp-errors))] - (if latest-error [comp-name (.get_error ^ErrorInfo latest-error)]))))))) - -(defn -main [name] - (with-configured-nimbus-connection nimbus - (let [opts (doto (GetInfoOptions.) - (.set_num_err_choice NumErrorsChoice/ONE)) - cluster-info (.getClusterInfo nimbus) - topologies (.get_topologies cluster-info) - topo-id (get-topology-id name topologies) - topo-info (when (not-nil? topo-id) (.getTopologyInfoWithOpts nimbus topo-id opts))] - (if (or (nil? topo-id) (nil? topo-info)) - (println (to-json {"Failure" (str "No topologies running with name " name)})) - (let [topology-name (.get_name topo-info) - topology-errors (.get_errors topo-info)] - (println (to-json (hash-map - "Topology Name" topology-name - "Comp-Errors" (get-component-errors topology-errors))))))))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/healthcheck.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/command/healthcheck.clj b/storm-core/src/clj/backtype/storm/command/healthcheck.clj deleted file mode 100644 index 14af223..0000000 --- a/storm-core/src/clj/backtype/storm/command/healthcheck.clj +++ /dev/null @@ -1,88 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.command.healthcheck - (:require [backtype.storm - [config :refer :all] - [log :refer :all]] - [clojure.java [io :as io]] - [clojure [string :refer [split]]]) - (:gen-class)) - -(defn interrupter - "Interrupt a given thread after ms milliseconds." - [thread ms] - (let [interrupter (Thread. - (fn [] - (try - (Thread/sleep ms) - (.interrupt thread) - (catch InterruptedException e))))] - (.start interrupter) - interrupter)) - -(defn check-output [lines] - (if (some #(.startsWith % "ERROR") lines) - :failed - :success)) - -(defn process-script [conf script] - (let [script-proc (. (Runtime/getRuntime) (exec script)) - curthread (Thread/currentThread) - interrupter-thread (interrupter curthread - (conf STORM-HEALTH-CHECK-TIMEOUT-MS))] - (try - (.waitFor script-proc) - (.interrupt interrupter-thread) - (if (not (= (.exitValue script-proc) 0)) - :failed_with_exit_code - (check-output (split - (slurp (.getInputStream script-proc)) - #"\n+"))) - (catch InterruptedException e - (println "Script" script "timed out.") - :timeout) - (catch Exception e - (println "Script failed with exception: " e) - :failed_with_exception) - (finally (.interrupt interrupter-thread))))) - -(defn health-check [conf] - (let [health-dir (absolute-healthcheck-dir conf) - health-files (file-seq (io/file health-dir)) - health-scripts (filter #(and (.canExecute %) - (not (.isDirectory %))) - health-files) - results (->> health-scripts - (map #(.getAbsolutePath %)) - (map (partial process-script conf)))] - (log-message - (pr-str (map #'vector - (map #(.getAbsolutePath %) health-scripts) - results))) - ; failed_with_exit_code is OK. We're mimicing Hadoop's health checks. - ; We treat non-zero exit codes as indicators that the scripts failed - ; to execute properly, not that the system is unhealthy, in which case - ; we don't want to start killing things. - (if (every? #(or (= % :failed_with_exit_code) - (= % :success)) - results) - 0 - 1))) - -(defn -main [& args] - (let [conf (read-storm-config)] - (System/exit - (health-check conf)))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/heartbeats.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/command/heartbeats.clj b/storm-core/src/clj/backtype/storm/command/heartbeats.clj deleted file mode 100644 index 99790aa..0000000 --- a/storm-core/src/clj/backtype/storm/command/heartbeats.clj +++ /dev/null @@ -1,52 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.command.heartbeats - (:require [backtype.storm - [config :refer :all] - [log :refer :all] - [cluster :refer :all] - [converter :refer :all]] - [clojure.string :refer :all]) - (:import [backtype.storm.generated ClusterWorkerHeartbeat] - [backtype.storm.utils Utils]) - (:gen-class)) - -(defn -main [command path & args] - (let [conf (read-storm-config) - cluster (mk-distributed-cluster-state conf :auth-conf conf)] - (println "Command: [" command "]") - (condp = command - "list" - (let [message (join " \n" (.get_worker_hb_children cluster path false))] - (log-message "list " path ":\n" - message "\n")) - "get" - (log-message - (if-let [hb (.get_worker_hb cluster path false)] - (clojurify-zk-worker-hb - (Utils/deserialize - hb - ClusterWorkerHeartbeat)) - "Nothing")) - - (log-message "Usage: heartbeats [list|get] path")) - - (try - (.close cluster) - (catch Exception e - (log-message "Caught exception: " e " on close.")))) - (System/exit 0)) - http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/kill_topology.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/command/kill_topology.clj b/storm-core/src/clj/backtype/storm/command/kill_topology.clj deleted file mode 100644 index 94b4585..0000000 --- a/storm-core/src/clj/backtype/storm/command/kill_topology.clj +++ /dev/null @@ -1,29 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.command.kill-topology - (:use [clojure.tools.cli :only [cli]]) - (:use [backtype.storm thrift config log]) - (:import [backtype.storm.generated KillOptions]) - (:gen-class)) - -(defn -main [& args] - (let [[{wait :wait} [name] _] (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)]) - opts (KillOptions.)] - (if wait (.set_wait_secs opts wait)) - (with-configured-nimbus-connection nimbus - (.killTopologyWithOpts nimbus name opts) - (log-message "Killed topology: " name) - ))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/kill_workers.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/command/kill_workers.clj b/storm-core/src/clj/backtype/storm/command/kill_workers.clj deleted file mode 100644 index 3866cc7..0000000 --- a/storm-core/src/clj/backtype/storm/command/kill_workers.clj +++ /dev/null @@ -1,33 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.command.kill-workers - (:import [java.io File]) - (:use [backtype.storm.daemon common]) - (:use [backtype.storm util config]) - (:require [backtype.storm.daemon - [supervisor :as supervisor]]) - (:gen-class)) - -(defn -main - "Construct the supervisor-data from scratch and kill the workers on this supervisor" - [& args] - (let [conf (read-storm-config) - conf (assoc conf STORM-LOCAL-DIR (. (File. (conf STORM-LOCAL-DIR)) getCanonicalPath)) - isupervisor (supervisor/standalone-supervisor) - supervisor-data (supervisor/supervisor-data conf nil isupervisor) - ids (supervisor/my-worker-ids conf)] - (doseq [id ids] - (supervisor/shutdown-worker supervisor-data id)))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/list.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/command/list.clj b/storm-core/src/clj/backtype/storm/command/list.clj deleted file mode 100644 index 79cfcf7..0000000 --- a/storm-core/src/clj/backtype/storm/command/list.clj +++ /dev/null @@ -1,38 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.command.list - (:use [backtype.storm thrift log]) - (:import [backtype.storm.generated TopologySummary]) - (:gen-class)) - -(defn -main [] - (with-configured-nimbus-connection nimbus - (let [cluster-info (.getClusterInfo nimbus) - topologies (.get_topologies cluster-info) - msg-format "%-20s %-10s %-10s %-12s %-10s"] - (if (or (nil? topologies) (empty? topologies)) - (println "No topologies running.") - (do - (println (format msg-format "Topology_name" "Status" "Num_tasks" "Num_workers" "Uptime_secs")) - (println "-------------------------------------------------------------------") - (doseq [^TopologySummary topology topologies] - (let [topology-name (.get_name topology) - topology-status (.get_status topology) - topology-num-tasks (.get_num_tasks topology) - topology-num-workers (.get_num_workers topology) - topology-uptime-secs (.get_uptime_secs topology)] - (println (format msg-format topology-name topology-status topology-num-tasks - topology-num-workers topology-uptime-secs))))))))) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/monitor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/command/monitor.clj b/storm-core/src/clj/backtype/storm/command/monitor.clj deleted file mode 100644 index 36ccbc9..0000000 --- a/storm-core/src/clj/backtype/storm/command/monitor.clj +++ /dev/null @@ -1,37 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.command.monitor - (:use [clojure.tools.cli :only [cli]]) - (:use [backtype.storm.thrift :only [with-configured-nimbus-connection]]) - (:import [backtype.storm.utils Monitor]) - (:gen-class) - ) - -(defn -main [& args] - (let [[{interval :interval component :component stream :stream watch :watch} [name] _] - (cli args ["-i" "--interval" :default 4 :parse-fn #(Integer/parseInt %)] - ["-m" "--component" :default nil] - ["-s" "--stream" :default "default"] - ["-w" "--watch" :default "emitted"]) - mon (Monitor.)] - (if interval (.set_interval mon interval)) - (if name (.set_topology mon name)) - (if component (.set_component mon component)) - (if stream (.set_stream mon stream)) - (if watch (.set_watch mon watch)) - (with-configured-nimbus-connection nimbus - (.metrics mon nimbus) - ))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/rebalance.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/command/rebalance.clj b/storm-core/src/clj/backtype/storm/command/rebalance.clj deleted file mode 100644 index e3a032b..0000000 --- a/storm-core/src/clj/backtype/storm/command/rebalance.clj +++ /dev/null @@ -1,46 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.command.rebalance - (:use [clojure.tools.cli :only [cli]]) - (:use [backtype.storm thrift config log]) - (:import [backtype.storm.generated RebalanceOptions]) - (:gen-class)) - -(defn- parse-executor [^String s] - (let [eq-pos (.lastIndexOf s "=") - name (.substring s 0 eq-pos) - amt (.substring s (inc eq-pos))] - {name (Integer/parseInt amt)} - )) - -(defn -main [& args] - (let [[{wait :wait executor :executor num-workers :num-workers} [name] _] - (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)] - ["-n" "--num-workers" :default nil :parse-fn #(Integer/parseInt %)] - ["-e" "--executor" :parse-fn parse-executor - :assoc-fn (fn [previous key val] - (assoc previous key - (if-let [oldval (get previous key)] - (merge oldval val) - val)))]) - opts (RebalanceOptions.)] - (if wait (.set_wait_secs opts wait)) - (if executor (.set_num_executors opts executor)) - (if num-workers (.set_num_workers opts num-workers)) - (with-configured-nimbus-connection nimbus - (.rebalance nimbus name opts) - (log-message "Topology " name " is rebalancing") - ))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/set_log_level.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/command/set_log_level.clj b/storm-core/src/clj/backtype/storm/command/set_log_level.clj deleted file mode 100644 index 88b297d..0000000 --- a/storm-core/src/clj/backtype/storm/command/set_log_level.clj +++ /dev/null @@ -1,75 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.command.set-log-level - (:use [clojure.tools.cli :only [cli]]) - (:use [backtype.storm thrift log]) - (:import [org.apache.logging.log4j Level]) - (:import [backtype.storm.generated LogConfig LogLevel LogLevelAction]) - (:gen-class)) - -(defn- get-storm-id - "Get topology id for a running topology from the topology name." - [nimbus name] - (let [info (.getClusterInfo nimbus) - topologies (.get_topologies info) - topology (first (filter (fn [topo] (= name (.get_name topo))) topologies))] - (if topology - (.get_id topology) - (throw (.IllegalArgumentException (str name " is not a running topology")))))) - -(defn- parse-named-log-levels [action] - "Parses [logger name]=[level string]:[optional timeout],[logger name2]... - - e.g. ROOT=DEBUG:30 - root logger, debug for 30 seconds - - org.apache.foo=WARN - org.apache.foo set to WARN indefinitely" - (fn [^String s] - (let [log-args (re-find #"(.*)=([A-Z]+):?(\d*)" s) - name (if (= action LogLevelAction/REMOVE) s (nth log-args 1)) - level (Level/toLevel (nth log-args 2)) - timeout-str (nth log-args 3) - log-level (LogLevel.)] - (if (= action LogLevelAction/REMOVE) - (.set_action log-level action) - (do - (.set_action log-level action) - (.set_target_log_level log-level (.toString level)) - (.set_reset_log_level_timeout_secs log-level - (Integer. (if (= timeout-str "") "0" timeout-str))))) - {name log-level}))) - -(defn- merge-together [previous key val] - (assoc previous key - (if-let [oldval (get previous key)] - (merge oldval val) - val))) - -(defn -main [& args] - (let [[{log-setting :log-setting remove-log-setting :remove-log-setting} [name] _] - (cli args ["-l" "--log-setting" - :parse-fn (parse-named-log-levels LogLevelAction/UPDATE) - :assoc-fn merge-together] - ["-r" "--remove-log-setting" - :parse-fn (parse-named-log-levels LogLevelAction/REMOVE) - :assoc-fn merge-together]) - log-config (LogConfig.)] - (doseq [[log-name log-val] (merge log-setting remove-log-setting)] - (.put_to_named_logger_level log-config log-name log-val)) - (log-message "Sent log config " log-config " for topology " name) - (with-configured-nimbus-connection nimbus - (.setLogConfig nimbus (get-storm-id nimbus name) log-config)))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/shell_submission.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/command/shell_submission.clj b/storm-core/src/clj/backtype/storm/command/shell_submission.clj deleted file mode 100644 index 9bb8efe..0000000 --- a/storm-core/src/clj/backtype/storm/command/shell_submission.clj +++ /dev/null @@ -1,33 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.command.shell-submission - (:import [backtype.storm StormSubmitter]) - (:use [backtype.storm thrift util config log zookeeper]) - (:require [clojure.string :as str]) - (:gen-class)) - - -(defn -main [^String tmpjarpath & args] - (let [conf (read-storm-config) - zk-leader-elector (zk-leader-elector conf) - leader-nimbus (.getLeader zk-leader-elector) - host (.getHost leader-nimbus) - port (.getPort leader-nimbus) - no-op (.close zk-leader-elector) - jarpath (StormSubmitter/submitJar conf tmpjarpath) - args (concat args [host port jarpath])] - (exec-command! (str/join " " args)) - )) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/upload_credentials.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/command/upload_credentials.clj b/storm-core/src/clj/backtype/storm/command/upload_credentials.clj deleted file mode 100644 index 05a82cb..0000000 --- a/storm-core/src/clj/backtype/storm/command/upload_credentials.clj +++ /dev/null @@ -1,35 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.command.upload-credentials - (:use [clojure.tools.cli :only [cli]]) - (:use [backtype.storm log util]) - (:import [backtype.storm StormSubmitter]) - (:import [java.util Properties]) - (:import [java.io FileReader]) - (:gen-class)) - -(defn read-map [file-name] - (let [props (Properties. ) - _ (.load props (FileReader. file-name))] - (clojurify-structure props))) - -(defn -main [& args] - (let [[{cred-file :file} [name & rawCreds]] (cli args ["-f" "--file" :default nil]) - _ (when (and rawCreds (not (even? (.size rawCreds)))) (throw (RuntimeException. "Need an even number of arguments to make a map"))) - mapping (if rawCreds (apply assoc {} rawCreds) {}) - file-mapping (if (nil? cred-file) {} (read-map cred-file))] - (StormSubmitter/pushCredentials name {} (merge file-mapping mapping)) - (log-message "Uploaded new creds to topology: " name))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/config.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj deleted file mode 100644 index 4d24f97..0000000 --- a/storm-core/src/clj/backtype/storm/config.clj +++ /dev/null @@ -1,331 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns backtype.storm.config - (:import [java.io FileReader File IOException] - [backtype.storm.generated StormTopology]) - (:import [backtype.storm Config]) - (:import [backtype.storm.utils Utils LocalState]) - (:import [backtype.storm.validation ConfigValidation]) - (:import [org.apache.commons.io FileUtils]) - (:require [clojure [string :as str]]) - (:use [backtype.storm log util])) - -(def RESOURCES-SUBDIR "resources") -(def NIMBUS-DO-NOT-REASSIGN "NIMBUS-DO-NOT-REASSIGN") - -(defn- clojure-config-name [name] - (.replace (.toUpperCase name) "_" "-")) - -; define clojure constants for every configuration parameter -(doseq [f (seq (.getFields Config))] - (let [name (.getName f) - new-name (clojure-config-name name)] - (eval - `(def ~(symbol new-name) (. Config ~(symbol name)))))) - -(def ALL-CONFIGS - (dofor [f (seq (.getFields Config))] - (.get f nil))) - - -(defn cluster-mode - [conf & args] - (keyword (conf STORM-CLUSTER-MODE))) - -(defn local-mode? - [conf] - (let [mode (conf STORM-CLUSTER-MODE)] - (condp = mode - "local" true - "distributed" false - (throw (IllegalArgumentException. - (str "Illegal cluster mode in conf: " mode)))))) - -(defn sampling-rate - [conf] - (->> (conf TOPOLOGY-STATS-SAMPLE-RATE) - (/ 1) - int)) - -(defn mk-stats-sampler - [conf] - (even-sampler (sampling-rate conf))) - -(defn read-default-config - [] - (clojurify-structure (Utils/readDefaultConfig))) - -(defn validate-configs-with-schemas - [conf] - (ConfigValidation/validateFields conf)) - -(defn read-storm-config - [] - (let [conf (clojurify-structure (Utils/readStormConfig))] - (validate-configs-with-schemas conf) - conf)) - -(defn read-yaml-config - ([name must-exist] - (let [conf (clojurify-structure (Utils/findAndReadConfigFile name must-exist))] - (validate-configs-with-schemas conf) - conf)) - ([name] - (read-yaml-config true))) - -(defn absolute-storm-local-dir [conf] - (let [storm-home (System/getProperty "storm.home") - path (conf STORM-LOCAL-DIR)] - (if path - (if (is-absolute-path? path) path (str storm-home file-path-separator path)) - (str storm-home file-path-separator "storm-local")))) - -(def LOG-DIR - (.getCanonicalPath - (clojure.java.io/file (or (System/getProperty "storm.log.dir") - (get (read-storm-config) "storm.log.dir") - (str (System/getProperty "storm.home") file-path-separator "logs"))))) - -(defn absolute-healthcheck-dir [conf] - (let [storm-home (System/getProperty "storm.home") - path (conf STORM-HEALTH-CHECK-DIR)] - (if path - (if (is-absolute-path? path) path (str storm-home file-path-separator path)) - (str storm-home file-path-separator "healthchecks")))) - -(defn master-local-dir - [conf] - (let [ret (str (absolute-storm-local-dir conf) file-path-separator "nimbus")] - (FileUtils/forceMkdir (File. ret)) - ret)) - -(defn master-stormjar-key - [topology-id] - (str topology-id "-stormjar.jar")) - -(defn master-stormcode-key - [topology-id] - (str topology-id "-stormcode.ser")) - -(defn master-stormconf-key - [topology-id] - (str topology-id "-stormconf.ser")) - -(defn master-stormdist-root - ([conf] - (str (master-local-dir conf) file-path-separator "stormdist")) - ([conf storm-id] - (str (master-stormdist-root conf) file-path-separator storm-id))) - -(defn master-tmp-dir - [conf] - (let [ret (str (master-local-dir conf) file-path-separator "tmp")] - (FileUtils/forceMkdir (File. ret)) - ret )) - -(defn read-supervisor-storm-conf-given-path - [conf stormconf-path] - (merge conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. stormconf-path)))))) - -(defn master-storm-metafile-path [stormroot ] - (str stormroot file-path-separator "storm-code-distributor.meta")) - -(defn master-stormjar-path - [stormroot] - (str stormroot file-path-separator "stormjar.jar")) - -(defn master-stormcode-path - [stormroot] - (str stormroot file-path-separator "stormcode.ser")) - -(defn master-stormconf-path - [stormroot] - (str stormroot file-path-separator "stormconf.ser")) - -(defn master-inbox - [conf] - (let [ret (str (master-local-dir conf) file-path-separator "inbox")] - (FileUtils/forceMkdir (File. ret)) - ret )) - -(defn master-inimbus-dir - [conf] - (str (master-local-dir conf) file-path-separator "inimbus")) - -(defn supervisor-local-dir - [conf] - (let [ret (str (absolute-storm-local-dir conf) file-path-separator "supervisor")] - (FileUtils/forceMkdir (File. ret)) - ret)) - -(defn supervisor-isupervisor-dir - [conf] - (str (supervisor-local-dir conf) file-path-separator "isupervisor")) - -(defn supervisor-stormdist-root - ([conf] - (str (supervisor-local-dir conf) file-path-separator "stormdist")) - ([conf storm-id] - (str (supervisor-stormdist-root conf) file-path-separator (url-encode storm-id)))) - -(defn supervisor-stormjar-path [stormroot] - (str stormroot file-path-separator "stormjar.jar")) - -(defn supervisor-storm-metafile-path [stormroot] - (str stormroot file-path-separator "storm-code-distributor.meta")) - -(defn supervisor-stormcode-path - [stormroot] - (str stormroot file-path-separator "stormcode.ser")) - -(defn supervisor-stormconf-path - [stormroot] - (str stormroot file-path-separator "stormconf.ser")) - -(defn supervisor-tmp-dir - [conf] - (let [ret (str (supervisor-local-dir conf) file-path-separator "tmp")] - (FileUtils/forceMkdir (File. ret)) - ret )) - -(defn supervisor-storm-resources-path - [stormroot] - (str stormroot file-path-separator RESOURCES-SUBDIR)) - -(defn ^LocalState supervisor-state - [conf] - (LocalState. (str (supervisor-local-dir conf) file-path-separator "localstate"))) - -(defn ^LocalState nimbus-topo-history-state - [conf] - (LocalState. (str (master-local-dir conf) file-path-separator "history"))) - -(defn read-supervisor-storm-conf - [conf storm-id] - (let [stormroot (supervisor-stormdist-root conf storm-id) - conf-path (supervisor-stormconf-path stormroot)] - (read-supervisor-storm-conf-given-path conf conf-path))) - -(defn read-supervisor-topology - [conf storm-id] - (let [stormroot (supervisor-stormdist-root conf storm-id) - topology-path (supervisor-stormcode-path stormroot)] - (Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)) StormTopology) - )) - -(defn worker-user-root [conf] - (str (absolute-storm-local-dir conf) "/workers-users")) - -(defn worker-user-file [conf worker-id] - (str (worker-user-root conf) "/" worker-id)) - -(defn get-worker-user [conf worker-id] - (log-message "GET worker-user " worker-id) - (try - (str/trim (slurp (worker-user-file conf worker-id))) - (catch IOException e - (log-warn-error e "Failed to get worker user for " worker-id ".") - nil - ))) - -(defn get-id-from-blob-key - [key] - (if-let [groups (re-find #"^(.*)((-stormjar\.jar)|(-stormcode\.ser)|(-stormconf\.ser))$" key)] - (nth groups 1))) - -(defn set-worker-user! [conf worker-id user] - (log-message "SET worker-user " worker-id " " user) - (let [file (worker-user-file conf worker-id)] - (.mkdirs (.getParentFile (File. file))) - (spit (worker-user-file conf worker-id) user))) - -(defn remove-worker-user! [conf worker-id] - (log-message "REMOVE worker-user " worker-id) - (.delete (File. (worker-user-file conf worker-id)))) - -(defn worker-artifacts-root - ([conf] - (let [workers-artifacts-dir (conf STORM-WORKERS-ARTIFACTS-DIR)] - (if workers-artifacts-dir - (if (is-absolute-path? workers-artifacts-dir) - workers-artifacts-dir - (str LOG-DIR file-path-separator workers-artifacts-dir)) - (str LOG-DIR file-path-separator "workers-artifacts")))) - ([conf id] - (str (worker-artifacts-root conf) file-path-separator id)) - ([conf id port] - (str (worker-artifacts-root conf id) file-path-separator port))) - -(defn worker-artifacts-pid-path - [conf id port] - (str (worker-artifacts-root conf id port) file-path-separator "worker.pid")) - -(defn get-log-metadata-file - ([fname] - (let [[id port & _] (str/split fname (re-pattern file-path-separator))] - (get-log-metadata-file (read-storm-config) id port))) - ([conf id port] - (clojure.java.io/file (str (worker-artifacts-root conf id) file-path-separator port file-path-separator) "worker.yaml"))) - -(defn get-worker-dir-from-root - [log-root id port] - (clojure.java.io/file (str log-root file-path-separator id file-path-separator port))) - -(defn worker-root - ([conf] - (str (absolute-storm-local-dir conf) file-path-separator "workers")) - ([conf id] - (str (worker-root conf) file-path-separator id))) - -(defn worker-pids-root - [conf id] - (str (worker-root conf id) file-path-separator "pids")) - -(defn worker-pid-path - [conf id pid] - (str (worker-pids-root conf id) file-path-separator pid)) - -(defn worker-heartbeats-root - [conf id] - (str (worker-root conf id) file-path-separator "heartbeats")) - -;; workers heartbeat here with pid and timestamp -;; if supervisor stops receiving heartbeat, it kills and restarts the process -;; in local mode, keep a global map of ids to threads for simulating process management -(defn ^LocalState worker-state - [conf id] - (LocalState. (worker-heartbeats-root conf id))) - -(defn override-login-config-with-system-property [conf] - (if-let [login_conf_file (System/getProperty "java.security.auth.login.config")] - (assoc conf "java.security.auth.login.config" login_conf_file) - conf)) - -(defn get-topo-logs-users - [topology-conf] - (sort (distinct (remove nil? - (concat - (topology-conf LOGS-USERS) - (topology-conf TOPOLOGY-USERS)))))) - -(defn get-topo-logs-groups - [topology-conf] - (sort (distinct (remove nil? - (concat - (topology-conf LOGS-GROUPS) - (topology-conf TOPOLOGY-GROUPS)))))) - http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/converter.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/converter.clj b/storm-core/src/clj/backtype/storm/converter.clj deleted file mode 100644 index 52a1817..0000000 --- a/storm-core/src/clj/backtype/storm/converter.clj +++ /dev/null @@ -1,277 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.converter - (:import [backtype.storm.generated SupervisorInfo NodeInfo Assignment WorkerResources - StormBase TopologyStatus ClusterWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions - TopologyActionOptions DebugOptions ProfileRequest]) - (:use [backtype.storm util stats log]) - (:require [backtype.storm.daemon [common :as common]])) - -(defn thriftify-supervisor-info [supervisor-info] - (doto (SupervisorInfo.) - (.set_time_secs (long (:time-secs supervisor-info))) - (.set_hostname (:hostname supervisor-info)) - (.set_assignment_id (:assignment-id supervisor-info)) - (.set_used_ports (map long (:used-ports supervisor-info))) - (.set_meta (map long (:meta supervisor-info))) - (.set_scheduler_meta (:scheduler-meta supervisor-info)) - (.set_uptime_secs (long (:uptime-secs supervisor-info))) - (.set_version (:version supervisor-info)) - (.set_resources_map (:resources-map supervisor-info)) - )) - -(defn clojurify-supervisor-info [^SupervisorInfo supervisor-info] - (if supervisor-info - (backtype.storm.daemon.common.SupervisorInfo. - (.get_time_secs supervisor-info) - (.get_hostname supervisor-info) - (.get_assignment_id supervisor-info) - (if (.get_used_ports supervisor-info) (into [] (.get_used_ports supervisor-info))) - (if (.get_meta supervisor-info) (into [] (.get_meta supervisor-info))) - (if (.get_scheduler_meta supervisor-info) (into {} (.get_scheduler_meta supervisor-info))) - (.get_uptime_secs supervisor-info) - (.get_version supervisor-info) - (if-let [res-map (.get_resources_map supervisor-info)] (into {} res-map))))) - -(defn thriftify-assignment [assignment] - (let [thrift-assignment (doto (Assignment.) - (.set_master_code_dir (:master-code-dir assignment)) - (.set_node_host (:node->host assignment)) - (.set_executor_node_port (into {} - (map (fn [[k v]] - [(map long k) - (NodeInfo. (first v) (set (map long (rest v))))]) - (:executor->node+port assignment)))) - (.set_executor_start_time_secs - (into {} - (map (fn [[k v]] - [(map long k) (long v)]) - (:executor->start-time-secs assignment)))))] - (if (:worker->resources assignment) - (.set_worker_resources thrift-assignment (into {} (map - (fn [[node+port resources]] - [(NodeInfo. (first node+port) (set (map long (rest node+port)))) - (doto (WorkerResources.) - (.set_mem_on_heap (first resources)) - (.set_mem_off_heap (second resources)) - (.set_cpu (last resources)))]) - (:worker->resources assignment))))) - thrift-assignment)) - -(defn clojurify-executor->node_port [executor->node_port] - (into {} - (map-val - (fn [nodeInfo] - (concat [(.get_node nodeInfo)] (.get_port nodeInfo))) ;nodeInfo should be converted to [node,port1,port2..] - (map-key - (fn [list-of-executors] - (into [] list-of-executors)) ; list of executors must be coverted to clojure vector to ensure it is sortable. - executor->node_port)))) - -(defn clojurify-worker->resources [worker->resources] - "convert worker info to be [node, port] - convert resources to be [mem_on_heap mem_off_heap cpu]" - (into {} (map - (fn [[nodeInfo resources]] - [(concat [(.get_node nodeInfo)] (.get_port nodeInfo)) - [(.get_mem_on_heap resources) (.get_mem_off_heap resources) (.get_cpu resources)]]) - worker->resources))) - -(defn clojurify-assignment [^Assignment assignment] - (if assignment - (backtype.storm.daemon.common.Assignment. - (.get_master_code_dir assignment) - (into {} (.get_node_host assignment)) - (clojurify-executor->node_port (into {} (.get_executor_node_port assignment))) - (map-key (fn [executor] (into [] executor)) - (into {} (.get_executor_start_time_secs assignment))) - (clojurify-worker->resources (into {} (.get_worker_resources assignment)))))) - -(defn convert-to-symbol-from-status [status] - (condp = status - TopologyStatus/ACTIVE {:type :active} - TopologyStatus/INACTIVE {:type :inactive} - TopologyStatus/REBALANCING {:type :rebalancing} - TopologyStatus/KILLED {:type :killed} - nil)) - -(defn- convert-to-status-from-symbol [status] - (if status - (condp = (:type status) - :active TopologyStatus/ACTIVE - :inactive TopologyStatus/INACTIVE - :rebalancing TopologyStatus/REBALANCING - :killed TopologyStatus/KILLED - nil))) - -(defn clojurify-rebalance-options [^RebalanceOptions rebalance-options] - (-> {:action :rebalance} - (assoc-non-nil :delay-secs (if (.is_set_wait_secs rebalance-options) (.get_wait_secs rebalance-options))) - (assoc-non-nil :num-workers (if (.is_set_num_workers rebalance-options) (.get_num_workers rebalance-options))) - (assoc-non-nil :component->executors (if (.is_set_num_executors rebalance-options) (into {} (.get_num_executors rebalance-options)))))) - -(defn thriftify-rebalance-options [rebalance-options] - (if rebalance-options - (let [thrift-rebalance-options (RebalanceOptions.)] - (if (:delay-secs rebalance-options) - (.set_wait_secs thrift-rebalance-options (int (:delay-secs rebalance-options)))) - (if (:num-workers rebalance-options) - (.set_num_workers thrift-rebalance-options (int (:num-workers rebalance-options)))) - (if (:component->executors rebalance-options) - (.set_num_executors thrift-rebalance-options (map-val int (:component->executors rebalance-options)))) - thrift-rebalance-options))) - -(defn clojurify-kill-options [^KillOptions kill-options] - (-> {:action :kill} - (assoc-non-nil :delay-secs (if (.is_set_wait_secs kill-options) (.get_wait_secs kill-options))))) - -(defn thriftify-kill-options [kill-options] - (if kill-options - (let [thrift-kill-options (KillOptions.)] - (if (:delay-secs kill-options) - (.set_wait_secs thrift-kill-options (int (:delay-secs kill-options)))) - thrift-kill-options))) - -(defn thriftify-topology-action-options [storm-base] - (if (:topology-action-options storm-base) - (let [ topology-action-options (:topology-action-options storm-base) - action (:action topology-action-options) - thrift-topology-action-options (TopologyActionOptions.)] - (if (= action :kill) - (.set_kill_options thrift-topology-action-options (thriftify-kill-options topology-action-options))) - (if (= action :rebalance) - (.set_rebalance_options thrift-topology-action-options (thriftify-rebalance-options topology-action-options))) - thrift-topology-action-options))) - -(defn clojurify-topology-action-options [^TopologyActionOptions topology-action-options] - (if topology-action-options - (or (and (.is_set_kill_options topology-action-options) - (clojurify-kill-options - (.get_kill_options topology-action-options))) - (and (.is_set_rebalance_options topology-action-options) - (clojurify-rebalance-options - (.get_rebalance_options topology-action-options)))))) - -(defn clojurify-debugoptions [^DebugOptions options] - (if options - { - :enable (.is_enable options) - :samplingpct (.get_samplingpct options) - } - )) - -(defn thriftify-debugoptions [options] - (doto (DebugOptions.) - (.set_enable (get options :enable false)) - (.set_samplingpct (get options :samplingpct 10)))) - -(defn thriftify-storm-base [storm-base] - (doto (StormBase.) - (.set_name (:storm-name storm-base)) - (.set_launch_time_secs (int (:launch-time-secs storm-base))) - (.set_status (convert-to-status-from-symbol (:status storm-base))) - (.set_num_workers (int (:num-workers storm-base))) - (.set_component_executors (map-val int (:component->executors storm-base))) - (.set_owner (:owner storm-base)) - (.set_topology_action_options (thriftify-topology-action-options storm-base)) - (.set_prev_status (convert-to-status-from-symbol (:prev-status storm-base))) - (.set_component_debug (map-val thriftify-debugoptions (:component->debug storm-base))))) - -(defn clojurify-storm-base [^StormBase storm-base] - (if storm-base - (backtype.storm.daemon.common.StormBase. - (.get_name storm-base) - (.get_launch_time_secs storm-base) - (convert-to-symbol-from-status (.get_status storm-base)) - (.get_num_workers storm-base) - (into {} (.get_component_executors storm-base)) - (.get_owner storm-base) - (clojurify-topology-action-options (.get_topology_action_options storm-base)) - (convert-to-symbol-from-status (.get_prev_status storm-base)) - (map-val clojurify-debugoptions (.get_component_debug storm-base))))) - -(defn thriftify-stats [stats] - (if stats - (map-val thriftify-executor-stats - (map-key #(ExecutorInfo. (int (first %1)) (int (last %1))) - stats)) - {})) - -(defn clojurify-stats [stats] - (if stats - (map-val clojurify-executor-stats - (map-key (fn [x] (list (.get_task_start x) (.get_task_end x))) - stats)) - {})) - -(defn clojurify-zk-worker-hb [^ClusterWorkerHeartbeat worker-hb] - (if worker-hb - {:storm-id (.get_storm_id worker-hb) - :executor-stats (clojurify-stats (into {} (.get_executor_stats worker-hb))) - :uptime (.get_uptime_secs worker-hb) - :time-secs (.get_time_secs worker-hb) - } - {})) - -(defn thriftify-zk-worker-hb [worker-hb] - (if (not-empty (filter second (:executor-stats worker-hb))) - (doto (ClusterWorkerHeartbeat.) - (.set_uptime_secs (:uptime worker-hb)) - (.set_storm_id (:storm-id worker-hb)) - (.set_executor_stats (thriftify-stats (filter second (:executor-stats worker-hb)))) - (.set_time_secs (:time-secs worker-hb))))) - -(defn clojurify-error [^ErrorInfo error] - (if error - { - :error (.get_error error) - :time-secs (.get_error_time_secs error) - :host (.get_host error) - :port (.get_port error) - } - )) - -(defn thriftify-error [error] - (doto (ErrorInfo. (:error error) (:time-secs error)) - (.set_host (:host error)) - (.set_port (:port error)))) - -(defn clojurify-profile-request - [^ProfileRequest request] - (when request - {:host (.get_node (.get_nodeInfo request)) - :port (first (.get_port (.get_nodeInfo request))) - :action (.get_action request) - :timestamp (.get_time_stamp request)})) - -(defn thriftify-profile-request - [profile-request] - (let [nodeinfo (doto (NodeInfo.) - (.set_node (:host profile-request)) - (.set_port (set [(:port profile-request)]))) - request (ProfileRequest. nodeinfo (:action profile-request))] - (.set_time_stamp request (:timestamp profile-request)) - request)) - -(defn thriftify-credentials [credentials] - (doto (Credentials.) - (.set_creds (if credentials credentials {})))) - -(defn clojurify-crdentials [^Credentials credentials] - (if credentials - (into {} (.get_creds credentials)) - nil - )) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/daemon/acker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/acker.clj b/storm-core/src/clj/backtype/storm/daemon/acker.clj deleted file mode 100644 index ce88d11..0000000 --- a/storm-core/src/clj/backtype/storm/daemon/acker.clj +++ /dev/null @@ -1,107 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.daemon.acker - (:import [backtype.storm.task OutputCollector TopologyContext IBolt]) - (:import [backtype.storm.tuple Tuple Fields]) - (:import [backtype.storm.utils RotatingMap MutableObject]) - (:import [java.util List Map]) - (:import [backtype.storm Constants]) - (:use [backtype.storm config util log]) - (:gen-class - :init init - :implements [backtype.storm.task.IBolt] - :constructors {[] []} - :state state )) - -(def ACKER-COMPONENT-ID "__acker") -(def ACKER-INIT-STREAM-ID "__ack_init") -(def ACKER-ACK-STREAM-ID "__ack_ack") -(def ACKER-FAIL-STREAM-ID "__ack_fail") - -(defn- update-ack [curr-entry val] - (let [old (get curr-entry :val 0)] - (assoc curr-entry :val (bit-xor old val)) - )) - -(defn- acker-emit-direct [^OutputCollector collector ^Integer task ^String stream ^List values] - (.emitDirect collector task stream values) - ) - -(defn mk-acker-bolt [] - (let [output-collector (MutableObject.) - pending (MutableObject.)] - (reify IBolt - (^void prepare [this ^Map storm-conf ^TopologyContext context ^OutputCollector collector] - (.setObject output-collector collector) - (.setObject pending (RotatingMap. 2)) - ) - (^void execute [this ^Tuple tuple] - (let [^RotatingMap pending (.getObject pending) - stream-id (.getSourceStreamId tuple)] - (if (= stream-id Constants/SYSTEM_TICK_STREAM_ID) - (.rotate pending) - (let [id (.getValue tuple 0) - ^OutputCollector output-collector (.getObject output-collector) - curr (.get pending id) - curr (condp = stream-id - ACKER-INIT-STREAM-ID (-> curr - (update-ack (.getValue tuple 1)) - (assoc :spout-task (.getValue tuple 2))) - ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1)) - ACKER-FAIL-STREAM-ID (assoc curr :failed true))] - (.put pending id curr) - (when (and curr (:spout-task curr)) - (cond (= 0 (:val curr)) - (do - (.remove pending id) - (acker-emit-direct output-collector - (:spout-task curr) - ACKER-ACK-STREAM-ID - [id] - )) - (:failed curr) - (do - (.remove pending id) - (acker-emit-direct output-collector - (:spout-task curr) - ACKER-FAIL-STREAM-ID - [id] - )) - )) - (.ack output-collector tuple) - )))) - (^void cleanup [this] - ) - ))) - -(defn -init [] - [[] (container)]) - -(defn -prepare [this conf context collector] - (let [^IBolt ret (mk-acker-bolt)] - (container-set! (.state ^backtype.storm.daemon.acker this) ret) - (.prepare ret conf context collector) - )) - -(defn -execute [this tuple] - (let [^IBolt delegate (container-get (.state ^backtype.storm.daemon.acker this))] - (.execute delegate tuple) - )) - -(defn -cleanup [this] - (let [^IBolt delegate (container-get (.state ^backtype.storm.daemon.acker this))] - (.cleanup delegate) - )) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj b/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj deleted file mode 100644 index 0caa0b9..0000000 --- a/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj +++ /dev/null @@ -1,98 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.daemon.builtin-metrics - (:import [backtype.storm.metric.api CountMetric StateMetric IMetric IStatefulObject]) - (:import [backtype.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric]) - (:import [backtype.storm Config]) - (:use [backtype.storm.stats])) - -(defrecord BuiltinSpoutMetrics [^MultiCountStatAndMetric ack-count - ^MultiLatencyStatAndMetric complete-latency - ^MultiCountStatAndMetric fail-count - ^MultiCountStatAndMetric emit-count - ^MultiCountStatAndMetric transfer-count]) -(defrecord BuiltinBoltMetrics [^MultiCountStatAndMetric ack-count - ^MultiLatencyStatAndMetric process-latency - ^MultiCountStatAndMetric fail-count - ^MultiCountStatAndMetric execute-count - ^MultiLatencyStatAndMetric execute-latency - ^MultiCountStatAndMetric emit-count - ^MultiCountStatAndMetric transfer-count]) -(defrecord SpoutThrottlingMetrics [^CountMetric skipped-max-spout - ^CountMetric skipped-throttle - ^CountMetric skipped-inactive]) - - -(defn make-data [executor-type stats] - (condp = executor-type - :spout (BuiltinSpoutMetrics. (stats-acked stats) - (stats-complete-latencies stats) - (stats-failed stats) - (stats-emitted stats) - (stats-transferred stats)) - :bolt (BuiltinBoltMetrics. (stats-acked stats) - (stats-process-latencies stats) - (stats-failed stats) - (stats-executed stats) - (stats-execute-latencies stats) - (stats-emitted stats) - (stats-transferred stats)))) - -(defn make-spout-throttling-data [] - (SpoutThrottlingMetrics. (CountMetric.) - (CountMetric.) - (CountMetric.))) - -(defn register-spout-throttling-metrics [throttling-metrics storm-conf topology-context] - (doseq [[kw imetric] throttling-metrics] - (.registerMetric topology-context (str "__" (name kw)) imetric - (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS))))) - -(defn register-all [builtin-metrics storm-conf topology-context] - (doseq [[kw imetric] builtin-metrics] - (.registerMetric topology-context (str "__" (name kw)) imetric - (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS))))) - -(defn register-iconnection-server-metric [server storm-conf topology-context] - (if (instance? IStatefulObject server) - (.registerMetric topology-context "__recv-iconnection" (StateMetric. server) - (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS))))) - -(defn register-iconnection-client-metrics [node+port->socket-ref storm-conf topology-context] - (.registerMetric topology-context "__send-iconnection" - (reify IMetric - (^Object getValueAndReset [this] - (into {} - (map - (fn [[node+port ^IStatefulObject connection]] [node+port (.getState connection)]) - (filter - (fn [[node+port connection]] (instance? IStatefulObject connection)) - @node+port->socket-ref))))) - (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))) - -(defn register-queue-metrics [queues storm-conf topology-context] - (doseq [[qname q] queues] - (.registerMetric topology-context (str "__" (name qname)) (StateMetric. q) - (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS))))) - -(defn skipped-max-spout! [^SpoutThrottlingMetrics m stats] - (-> m .skipped-max-spout (.incrBy (stats-rate stats)))) - -(defn skipped-throttle! [^SpoutThrottlingMetrics m stats] - (-> m .skipped-throttle (.incrBy (stats-rate stats)))) - -(defn skipped-inactive! [^SpoutThrottlingMetrics m stats] - (-> m .skipped-inactive (.incrBy (stats-rate stats)))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/daemon/common.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj deleted file mode 100644 index 9b3aab3..0000000 --- a/storm-core/src/clj/backtype/storm/daemon/common.clj +++ /dev/null @@ -1,402 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.daemon.common - (:use [backtype.storm log config util]) - (:import [backtype.storm.generated StormTopology - InvalidTopologyException GlobalStreamId] - [backtype.storm.utils ThriftTopologyUtils]) - (:import [backtype.storm.utils Utils]) - (:import [backtype.storm.task WorkerTopologyContext]) - (:import [backtype.storm Constants]) - (:import [backtype.storm.metric SystemBolt]) - (:import [backtype.storm.metric EventLoggerBolt]) - (:import [backtype.storm.security.auth IAuthorizer]) - (:import [java.io InterruptedIOException]) - (:require [clojure.set :as set]) - (:require [backtype.storm.daemon.acker :as acker]) - (:require [backtype.storm.thrift :as thrift]) - (:require [metrics.reporters.jmx :as jmx])) - -(defn start-metrics-reporters [] - (jmx/start (jmx/reporter {}))) - -(def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID) -(def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID) -(def ACKER-ACK-STREAM-ID acker/ACKER-ACK-STREAM-ID) -(def ACKER-FAIL-STREAM-ID acker/ACKER-FAIL-STREAM-ID) - -(def SYSTEM-STREAM-ID "__system") - -(def EVENTLOGGER-COMPONENT-ID "__eventlogger") -(def EVENTLOGGER-STREAM-ID "__eventlog") - -(def SYSTEM-COMPONENT-ID Constants/SYSTEM_COMPONENT_ID) -(def SYSTEM-TICK-STREAM-ID Constants/SYSTEM_TICK_STREAM_ID) -(def METRICS-STREAM-ID Constants/METRICS_STREAM_ID) -(def METRICS-TICK-STREAM-ID Constants/METRICS_TICK_STREAM_ID) -(def CREDENTIALS-CHANGED-STREAM-ID Constants/CREDENTIALS_CHANGED_STREAM_ID) - -;; the task id is the virtual port -;; node->host is here so that tasks know who to talk to just from assignment -;; this avoid situation where node goes down and task doesn't know what to do information-wise -(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs worker->resources]) - - -;; component->executors is a map from spout/bolt id to number of executors for that component -(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner topology-action-options prev-status component->debug]) - -(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version resources-map]) - -(defprotocol DaemonCommon - (waiting? [this])) - -(defrecord ExecutorStats [^long processed - ^long acked - ^long emitted - ^long transferred - ^long failed]) - -(defn new-executor-stats [] - (ExecutorStats. 0 0 0 0 0)) - -(defn get-storm-id [storm-cluster-state storm-name] - (let [active-storms (.active-storms storm-cluster-state)] - (find-first - #(= storm-name (:storm-name (.storm-base storm-cluster-state % nil))) - active-storms) - )) - -(defn topology-bases [storm-cluster-state] - (let [active-topologies (.active-storms storm-cluster-state)] - (into {} - (dofor [id active-topologies] - [id (.storm-base storm-cluster-state id nil)] - )) - )) - -(defn validate-distributed-mode! [conf] - (if (local-mode? conf) - (throw - (IllegalArgumentException. "Cannot start server in local mode!")))) - -(defmacro defserverfn [name & body] - `(let [exec-fn# (fn ~@body)] - (defn ~name [& args#] - (try-cause - (apply exec-fn# args#) - (catch InterruptedIOException e# - (throw e#)) - (catch InterruptedException e# - (throw e#)) - (catch Throwable t# - (log-error t# "Error on initialization of server " ~(str name)) - (exit-process! 13 "Error on initialization") - ))))) - -(defn- validate-ids! [^StormTopology topology] - (let [sets (map #(.getFieldValue topology %) thrift/STORM-TOPOLOGY-FIELDS) - offending (apply any-intersection sets)] - (if-not (empty? offending) - (throw (InvalidTopologyException. - (str "Duplicate component ids: " offending)))) - (doseq [f thrift/STORM-TOPOLOGY-FIELDS - :let [obj-map (.getFieldValue topology f)]] - (if-not (ThriftTopologyUtils/isWorkerHook f) - (do - (doseq [id (keys obj-map)] - (if (Utils/isSystemId id) - (throw (InvalidTopologyException. - (str id " is not a valid component id"))))) - (doseq [obj (vals obj-map) - id (-> obj .get_common .get_streams keys)] - (if (Utils/isSystemId id) - (throw (InvalidTopologyException. - (str id " is not a valid stream id")))))))))) - -(defn all-components [^StormTopology topology] - (apply merge {} - (for [f thrift/STORM-TOPOLOGY-FIELDS] - (if-not (ThriftTopologyUtils/isWorkerHook f) - (.getFieldValue topology f))))) - -(defn component-conf [component] - (->> component - .get_common - .get_json_conf - from-json)) - -(defn validate-basic! [^StormTopology topology] - (validate-ids! topology) - (doseq [f thrift/SPOUT-FIELDS - obj (->> f (.getFieldValue topology) vals)] - (if-not (empty? (-> obj .get_common .get_inputs)) - (throw (InvalidTopologyException. "May not declare inputs for a spout")))) - (doseq [[comp-id comp] (all-components topology) - :let [conf (component-conf comp) - p (-> comp .get_common thrift/parallelism-hint)]] - (when (and (> (conf TOPOLOGY-TASKS) 0) - p - (<= p 0)) - (throw (InvalidTopologyException. "Number of executors must be greater than 0 when number of tasks is greater than 0")) - ))) - -(defn validate-structure! [^StormTopology topology] - ;; validate all the component subscribe from component+stream which actually exists in the topology - ;; and if it is a fields grouping, validate the corresponding field exists - (let [all-components (all-components topology)] - (doseq [[id comp] all-components - :let [inputs (.. comp get_common get_inputs)]] - (doseq [[global-stream-id grouping] inputs - :let [source-component-id (.get_componentId global-stream-id) - source-stream-id (.get_streamId global-stream-id)]] - (if-not (contains? all-components source-component-id) - (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from non-existent component [" source-component-id "]"))) - (let [source-streams (-> all-components (get source-component-id) .get_common .get_streams)] - (if-not (contains? source-streams source-stream-id) - (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from non-existent stream: [" source-stream-id "] of component [" source-component-id "]"))) - (if (= :fields (thrift/grouping-type grouping)) - (let [grouping-fields (set (.get_fields grouping)) - source-stream-fields (-> source-streams (get source-stream-id) .get_output_fields set) - diff-fields (set/difference grouping-fields source-stream-fields)] - (when-not (empty? diff-fields) - (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from stream: [" source-stream-id "] of component [" source-component-id "] with non-existent fields: " diff-fields))))))))))))) - -(defn acker-inputs [^StormTopology topology] - (let [bolt-ids (.. topology get_bolts keySet) - spout-ids (.. topology get_spouts keySet) - spout-inputs (apply merge - (for [id spout-ids] - {[id ACKER-INIT-STREAM-ID] ["id"]} - )) - bolt-inputs (apply merge - (for [id bolt-ids] - {[id ACKER-ACK-STREAM-ID] ["id"] - [id ACKER-FAIL-STREAM-ID] ["id"]} - ))] - (merge spout-inputs bolt-inputs))) - -;; the event logger receives inputs from all the spouts and bolts -;; with a field grouping on component id so that all tuples from a component -;; goes to same executor and can be viewed via logviewer. -(defn eventlogger-inputs [^StormTopology topology] - (let [bolt-ids (.. topology get_bolts keySet) - spout-ids (.. topology get_spouts keySet) - spout-inputs (apply merge - (for [id spout-ids] - {[id EVENTLOGGER-STREAM-ID] ["component-id"]} - )) - bolt-inputs (apply merge - (for [id bolt-ids] - {[id EVENTLOGGER-STREAM-ID] ["component-id"]} - ))] - (merge spout-inputs bolt-inputs))) - -(defn add-acker! [storm-conf ^StormTopology ret] - (let [num-executors (if (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-ACKER-EXECUTORS)) - acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret) - (new backtype.storm.daemon.acker) - {ACKER-ACK-STREAM-ID (thrift/direct-output-fields ["id"]) - ACKER-FAIL-STREAM-ID (thrift/direct-output-fields ["id"]) - } - :p num-executors - :conf {TOPOLOGY-TASKS num-executors - TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})] - (dofor [[_ bolt] (.get_bolts ret) - :let [common (.get_common bolt)]] - (do - (.put_to_streams common ACKER-ACK-STREAM-ID (thrift/output-fields ["id" "ack-val"])) - (.put_to_streams common ACKER-FAIL-STREAM-ID (thrift/output-fields ["id"])) - )) - (dofor [[_ spout] (.get_spouts ret) - :let [common (.get_common spout) - spout-conf (merge - (component-conf spout) - {TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]] - (do - ;; this set up tick tuples to cause timeouts to be triggered - (.set_json_conf common (to-json spout-conf)) - (.put_to_streams common ACKER-INIT-STREAM-ID (thrift/output-fields ["id" "init-val" "spout-task"])) - (.put_to_inputs common - (GlobalStreamId. ACKER-COMPONENT-ID ACKER-ACK-STREAM-ID) - (thrift/mk-direct-grouping)) - (.put_to_inputs common - (GlobalStreamId. ACKER-COMPONENT-ID ACKER-FAIL-STREAM-ID) - (thrift/mk-direct-grouping)) - )) - (.put_to_bolts ret "__acker" acker-bolt) - )) - -(defn add-metric-streams! [^StormTopology topology] - (doseq [[_ component] (all-components topology) - :let [common (.get_common component)]] - (.put_to_streams common METRICS-STREAM-ID - (thrift/output-fields ["task-info" "data-points"])))) - -(defn add-system-streams! [^StormTopology topology] - (doseq [[_ component] (all-components topology) - :let [common (.get_common component)]] - (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"])))) - - -(defn map-occurrences [afn coll] - (->> coll - (reduce (fn [[counts new-coll] x] - (let [occurs (inc (get counts x 0))] - [(assoc counts x occurs) (cons (afn x occurs) new-coll)])) - [{} []]) - (second) - (reverse))) - -(defn number-duplicates - "(number-duplicates [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]" - [coll] - (map-occurrences (fn [x occurences] (if (>= occurences 2) (str x "#" occurences) x)) coll)) - -(defn metrics-consumer-register-ids - "Generates a list of component ids for each metrics consumer - e.g. [\"__metrics_org.mycompany.MyMetricsConsumer\", ..] " - [storm-conf] - (->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER) - (map #(get % "class")) - (number-duplicates) - (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %)))) - -(defn metrics-consumer-bolt-specs [storm-conf topology] - (let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology))) - inputs (->> (for [comp-id component-ids-that-emit-metrics] - {[comp-id METRICS-STREAM-ID] :shuffle}) - (into {})) - - mk-bolt-spec (fn [class arg p] - (thrift/mk-bolt-spec* - inputs - (backtype.storm.metric.MetricsConsumerBolt. class arg) - {} :p p :conf {TOPOLOGY-TASKS p}))] - - (map - (fn [component-id register] - [component-id (mk-bolt-spec (get register "class") - (get register "argument") - (or (get register "parallelism.hint") 1))]) - - (metrics-consumer-register-ids storm-conf) - (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)))) - -;; return the fields that event logger bolt expects -(defn eventlogger-bolt-fields [] - [(EventLoggerBolt/FIELD_COMPONENT_ID) (EventLoggerBolt/FIELD_MESSAGE_ID) (EventLoggerBolt/FIELD_TS) (EventLoggerBolt/FIELD_VALUES)] - ) - -(defn add-eventlogger! [storm-conf ^StormTopology ret] - (let [num-executors (if (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) - eventlogger-bolt (thrift/mk-bolt-spec* (eventlogger-inputs ret) - (EventLoggerBolt.) - {} - :p num-executors - :conf {TOPOLOGY-TASKS num-executors - TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})] - - (doseq [[_ component] (all-components ret) - :let [common (.get_common component)]] - (.put_to_streams common EVENTLOGGER-STREAM-ID (thrift/output-fields (eventlogger-bolt-fields)))) - (.put_to_bolts ret EVENTLOGGER-COMPONENT-ID eventlogger-bolt) - )) - -(defn add-metric-components! [storm-conf ^StormTopology topology] - (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)] - (.put_to_bolts topology comp-id bolt-spec))) - -(defn add-system-components! [conf ^StormTopology topology] - (let [system-bolt-spec (thrift/mk-bolt-spec* - {} - (SystemBolt.) - {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"]) - METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"]) - CREDENTIALS-CHANGED-STREAM-ID (thrift/output-fields ["creds"])} - :p 0 - :conf {TOPOLOGY-TASKS 0})] - (.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec))) - -(defn system-topology! [storm-conf ^StormTopology topology] - (validate-basic! topology) - (let [ret (.deepCopy topology)] - (add-acker! storm-conf ret) - (add-eventlogger! storm-conf ret) - (add-metric-components! storm-conf ret) - (add-system-components! storm-conf ret) - (add-metric-streams! ret) - (add-system-streams! ret) - (validate-structure! ret) - ret - )) - -(defn has-ackers? [storm-conf] - (or (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (> (storm-conf TOPOLOGY-ACKER-EXECUTORS) 0))) - -(defn has-eventloggers? [storm-conf] - (or (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) (> (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS) 0))) - -(defn num-start-executors [component] - (thrift/parallelism-hint (.get_common component))) - -(defn storm-task-info - "Returns map from task -> component id" - [^StormTopology user-topology storm-conf] - (->> (system-topology! storm-conf user-topology) - all-components - (map-val (comp #(get % TOPOLOGY-TASKS) component-conf)) - (sort-by first) - (mapcat (fn [[c num-tasks]] (repeat num-tasks c))) - (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1))) - (into {}) - )) - -(defn executor-id->tasks [[first-task-id last-task-id]] - (->> (range first-task-id (inc last-task-id)) - (map int))) - -(defn worker-context [worker] - (WorkerTopologyContext. (:system-topology worker) - (:storm-conf worker) - (:task->component worker) - (:component->sorted-tasks worker) - (:component->stream->fields worker) - (:storm-id worker) - (supervisor-storm-resources-path - (supervisor-stormdist-root (:conf worker) (:storm-id worker))) - (worker-pids-root (:conf worker) (:worker-id worker)) - (:port worker) - (:task-ids worker) - (:default-shared-resources worker) - (:user-shared-resources worker) - )) - - -(defn to-task->node+port [executor->node+port] - (->> executor->node+port - (mapcat (fn [[e node+port]] (for [t (executor-id->tasks e)] [t node+port]))) - (into {}))) - -(defn mk-authorization-handler [klassname conf] - (let [aznClass (if klassname (Class/forName klassname)) - aznHandler (if aznClass (.newInstance aznClass))] - (if aznHandler (.prepare ^IAuthorizer aznHandler conf)) - (log-debug "authorization class name:" klassname - " class:" aznClass - " handler:" aznHandler) - aznHandler - )) -
