http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/config.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/config.clj b/storm-core/src/clj/org/apache/storm/config.clj new file mode 100644 index 0000000..d65c439 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/config.clj @@ -0,0 +1,331 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. + +(ns org.apache.storm.config + (:import [java.io FileReader File IOException] + [org.apache.storm.generated StormTopology]) + (:import [org.apache.storm Config]) + (:import [org.apache.storm.utils Utils LocalState]) + (:import [org.apache.storm.validation ConfigValidation]) + (:import [org.apache.commons.io FileUtils]) + (:require [clojure [string :as str]]) + (:use [org.apache.storm log util])) + +(def RESOURCES-SUBDIR "resources") +(def NIMBUS-DO-NOT-REASSIGN "NIMBUS-DO-NOT-REASSIGN") + +(defn- clojure-config-name [name] + (.replace (.toUpperCase name) "_" "-")) + +; define clojure constants for every configuration parameter +(doseq [f (seq (.getFields Config))] + (let [name (.getName f) + new-name (clojure-config-name name)] + (eval + `(def ~(symbol new-name) (. Config ~(symbol name)))))) + +(def ALL-CONFIGS + (dofor [f (seq (.getFields Config))] + (.get f nil))) + + +(defn cluster-mode + [conf & args] + (keyword (conf STORM-CLUSTER-MODE))) + +(defn local-mode? + [conf] + (let [mode (conf STORM-CLUSTER-MODE)] + (condp = mode + "local" true + "distributed" false + (throw (IllegalArgumentException. + (str "Illegal cluster mode in conf: " mode)))))) + +(defn sampling-rate + [conf] + (->> (conf TOPOLOGY-STATS-SAMPLE-RATE) + (/ 1) + int)) + +(defn mk-stats-sampler + [conf] + (even-sampler (sampling-rate conf))) + +(defn read-default-config + [] + (clojurify-structure (Utils/readDefaultConfig))) + +(defn validate-configs-with-schemas + [conf] + (ConfigValidation/validateFields conf)) + +(defn read-storm-config + [] + (let [conf (clojurify-structure (Utils/readStormConfig))] + (validate-configs-with-schemas conf) + conf)) + +(defn read-yaml-config + ([name must-exist] + (let [conf (clojurify-structure (Utils/findAndReadConfigFile name must-exist))] + (validate-configs-with-schemas conf) + conf)) + ([name] + (read-yaml-config true))) + +(defn absolute-storm-local-dir [conf] + (let [storm-home (System/getProperty "storm.home") + path (conf STORM-LOCAL-DIR)] + (if path + (if (is-absolute-path? path) path (str storm-home file-path-separator path)) + (str storm-home file-path-separator "storm-local")))) + +(def LOG-DIR + (.getCanonicalPath + (clojure.java.io/file (or (System/getProperty "storm.log.dir") + (get (read-storm-config) "storm.log.dir") + (str (System/getProperty "storm.home") file-path-separator "logs"))))) + +(defn absolute-healthcheck-dir [conf] + (let [storm-home (System/getProperty "storm.home") + path (conf STORM-HEALTH-CHECK-DIR)] + (if path + (if (is-absolute-path? path) path (str storm-home file-path-separator path)) + (str storm-home file-path-separator "healthchecks")))) + +(defn master-local-dir + [conf] + (let [ret (str (absolute-storm-local-dir conf) file-path-separator "nimbus")] + (FileUtils/forceMkdir (File. ret)) + ret)) + +(defn master-stormjar-key + [topology-id] + (str topology-id "-stormjar.jar")) + +(defn master-stormcode-key + [topology-id] + (str topology-id "-stormcode.ser")) + +(defn master-stormconf-key + [topology-id] + (str topology-id "-stormconf.ser")) + +(defn master-stormdist-root + ([conf] + (str (master-local-dir conf) file-path-separator "stormdist")) + ([conf storm-id] + (str (master-stormdist-root conf) file-path-separator storm-id))) + +(defn master-tmp-dir + [conf] + (let [ret (str (master-local-dir conf) file-path-separator "tmp")] + (FileUtils/forceMkdir (File. ret)) + ret )) + +(defn read-supervisor-storm-conf-given-path + [conf stormconf-path] + (merge conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. stormconf-path)))))) + +(defn master-storm-metafile-path [stormroot ] + (str stormroot file-path-separator "storm-code-distributor.meta")) + +(defn master-stormjar-path + [stormroot] + (str stormroot file-path-separator "stormjar.jar")) + +(defn master-stormcode-path + [stormroot] + (str stormroot file-path-separator "stormcode.ser")) + +(defn master-stormconf-path + [stormroot] + (str stormroot file-path-separator "stormconf.ser")) + +(defn master-inbox + [conf] + (let [ret (str (master-local-dir conf) file-path-separator "inbox")] + (FileUtils/forceMkdir (File. ret)) + ret )) + +(defn master-inimbus-dir + [conf] + (str (master-local-dir conf) file-path-separator "inimbus")) + +(defn supervisor-local-dir + [conf] + (let [ret (str (absolute-storm-local-dir conf) file-path-separator "supervisor")] + (FileUtils/forceMkdir (File. ret)) + ret)) + +(defn supervisor-isupervisor-dir + [conf] + (str (supervisor-local-dir conf) file-path-separator "isupervisor")) + +(defn supervisor-stormdist-root + ([conf] + (str (supervisor-local-dir conf) file-path-separator "stormdist")) + ([conf storm-id] + (str (supervisor-stormdist-root conf) file-path-separator (url-encode storm-id)))) + +(defn supervisor-stormjar-path [stormroot] + (str stormroot file-path-separator "stormjar.jar")) + +(defn supervisor-storm-metafile-path [stormroot] + (str stormroot file-path-separator "storm-code-distributor.meta")) + +(defn supervisor-stormcode-path + [stormroot] + (str stormroot file-path-separator "stormcode.ser")) + +(defn supervisor-stormconf-path + [stormroot] + (str stormroot file-path-separator "stormconf.ser")) + +(defn supervisor-tmp-dir + [conf] + (let [ret (str (supervisor-local-dir conf) file-path-separator "tmp")] + (FileUtils/forceMkdir (File. ret)) + ret )) + +(defn supervisor-storm-resources-path + [stormroot] + (str stormroot file-path-separator RESOURCES-SUBDIR)) + +(defn ^LocalState supervisor-state + [conf] + (LocalState. (str (supervisor-local-dir conf) file-path-separator "localstate"))) + +(defn ^LocalState nimbus-topo-history-state + [conf] + (LocalState. (str (master-local-dir conf) file-path-separator "history"))) + +(defn read-supervisor-storm-conf + [conf storm-id] + (let [stormroot (supervisor-stormdist-root conf storm-id) + conf-path (supervisor-stormconf-path stormroot)] + (read-supervisor-storm-conf-given-path conf conf-path))) + +(defn read-supervisor-topology + [conf storm-id] + (let [stormroot (supervisor-stormdist-root conf storm-id) + topology-path (supervisor-stormcode-path stormroot)] + (Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)) StormTopology) + )) + +(defn worker-user-root [conf] + (str (absolute-storm-local-dir conf) "/workers-users")) + +(defn worker-user-file [conf worker-id] + (str (worker-user-root conf) "/" worker-id)) + +(defn get-worker-user [conf worker-id] + (log-message "GET worker-user " worker-id) + (try + (str/trim (slurp (worker-user-file conf worker-id))) + (catch IOException e + (log-warn-error e "Failed to get worker user for " worker-id ".") + nil + ))) + +(defn get-id-from-blob-key + [key] + (if-let [groups (re-find #"^(.*)((-stormjar\.jar)|(-stormcode\.ser)|(-stormconf\.ser))$" key)] + (nth groups 1))) + +(defn set-worker-user! [conf worker-id user] + (log-message "SET worker-user " worker-id " " user) + (let [file (worker-user-file conf worker-id)] + (.mkdirs (.getParentFile (File. file))) + (spit (worker-user-file conf worker-id) user))) + +(defn remove-worker-user! [conf worker-id] + (log-message "REMOVE worker-user " worker-id) + (.delete (File. (worker-user-file conf worker-id)))) + +(defn worker-artifacts-root + ([conf] + (let [workers-artifacts-dir (conf STORM-WORKERS-ARTIFACTS-DIR)] + (if workers-artifacts-dir + (if (is-absolute-path? workers-artifacts-dir) + workers-artifacts-dir + (str LOG-DIR file-path-separator workers-artifacts-dir)) + (str LOG-DIR file-path-separator "workers-artifacts")))) + ([conf id] + (str (worker-artifacts-root conf) file-path-separator id)) + ([conf id port] + (str (worker-artifacts-root conf id) file-path-separator port))) + +(defn worker-artifacts-pid-path + [conf id port] + (str (worker-artifacts-root conf id port) file-path-separator "worker.pid")) + +(defn get-log-metadata-file + ([fname] + (let [[id port & _] (str/split fname (re-pattern file-path-separator))] + (get-log-metadata-file (read-storm-config) id port))) + ([conf id port] + (clojure.java.io/file (str (worker-artifacts-root conf id) file-path-separator port file-path-separator) "worker.yaml"))) + +(defn get-worker-dir-from-root + [log-root id port] + (clojure.java.io/file (str log-root file-path-separator id file-path-separator port))) + +(defn worker-root + ([conf] + (str (absolute-storm-local-dir conf) file-path-separator "workers")) + ([conf id] + (str (worker-root conf) file-path-separator id))) + +(defn worker-pids-root + [conf id] + (str (worker-root conf id) file-path-separator "pids")) + +(defn worker-pid-path + [conf id pid] + (str (worker-pids-root conf id) file-path-separator pid)) + +(defn worker-heartbeats-root + [conf id] + (str (worker-root conf id) file-path-separator "heartbeats")) + +;; workers heartbeat here with pid and timestamp +;; if supervisor stops receiving heartbeat, it kills and restarts the process +;; in local mode, keep a global map of ids to threads for simulating process management +(defn ^LocalState worker-state + [conf id] + (LocalState. (worker-heartbeats-root conf id))) + +(defn override-login-config-with-system-property [conf] + (if-let [login_conf_file (System/getProperty "java.security.auth.login.config")] + (assoc conf "java.security.auth.login.config" login_conf_file) + conf)) + +(defn get-topo-logs-users + [topology-conf] + (sort (distinct (remove nil? + (concat + (topology-conf LOGS-USERS) + (topology-conf TOPOLOGY-USERS)))))) + +(defn get-topo-logs-groups + [topology-conf] + (sort (distinct (remove nil? + (concat + (topology-conf LOGS-GROUPS) + (topology-conf TOPOLOGY-GROUPS)))))) +
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/converter.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/converter.clj b/storm-core/src/clj/org/apache/storm/converter.clj new file mode 100644 index 0000000..bb2dc87 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/converter.clj @@ -0,0 +1,277 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.converter + (:import [org.apache.storm.generated SupervisorInfo NodeInfo Assignment WorkerResources + StormBase TopologyStatus ClusterWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions + TopologyActionOptions DebugOptions ProfileRequest]) + (:use [org.apache.storm util stats log]) + (:require [org.apache.storm.daemon [common :as common]])) + +(defn thriftify-supervisor-info [supervisor-info] + (doto (SupervisorInfo.) + (.set_time_secs (long (:time-secs supervisor-info))) + (.set_hostname (:hostname supervisor-info)) + (.set_assignment_id (:assignment-id supervisor-info)) + (.set_used_ports (map long (:used-ports supervisor-info))) + (.set_meta (map long (:meta supervisor-info))) + (.set_scheduler_meta (:scheduler-meta supervisor-info)) + (.set_uptime_secs (long (:uptime-secs supervisor-info))) + (.set_version (:version supervisor-info)) + (.set_resources_map (:resources-map supervisor-info)) + )) + +(defn clojurify-supervisor-info [^SupervisorInfo supervisor-info] + (if supervisor-info + (org.apache.storm.daemon.common.SupervisorInfo. + (.get_time_secs supervisor-info) + (.get_hostname supervisor-info) + (.get_assignment_id supervisor-info) + (if (.get_used_ports supervisor-info) (into [] (.get_used_ports supervisor-info))) + (if (.get_meta supervisor-info) (into [] (.get_meta supervisor-info))) + (if (.get_scheduler_meta supervisor-info) (into {} (.get_scheduler_meta supervisor-info))) + (.get_uptime_secs supervisor-info) + (.get_version supervisor-info) + (if-let [res-map (.get_resources_map supervisor-info)] (into {} res-map))))) + +(defn thriftify-assignment [assignment] + (let [thrift-assignment (doto (Assignment.) + (.set_master_code_dir (:master-code-dir assignment)) + (.set_node_host (:node->host assignment)) + (.set_executor_node_port (into {} + (map (fn [[k v]] + [(map long k) + (NodeInfo. (first v) (set (map long (rest v))))]) + (:executor->node+port assignment)))) + (.set_executor_start_time_secs + (into {} + (map (fn [[k v]] + [(map long k) (long v)]) + (:executor->start-time-secs assignment)))))] + (if (:worker->resources assignment) + (.set_worker_resources thrift-assignment (into {} (map + (fn [[node+port resources]] + [(NodeInfo. (first node+port) (set (map long (rest node+port)))) + (doto (WorkerResources.) + (.set_mem_on_heap (first resources)) + (.set_mem_off_heap (second resources)) + (.set_cpu (last resources)))]) + (:worker->resources assignment))))) + thrift-assignment)) + +(defn clojurify-executor->node_port [executor->node_port] + (into {} + (map-val + (fn [nodeInfo] + (concat [(.get_node nodeInfo)] (.get_port nodeInfo))) ;nodeInfo should be converted to [node,port1,port2..] + (map-key + (fn [list-of-executors] + (into [] list-of-executors)) ; list of executors must be coverted to clojure vector to ensure it is sortable. + executor->node_port)))) + +(defn clojurify-worker->resources [worker->resources] + "convert worker info to be [node, port] + convert resources to be [mem_on_heap mem_off_heap cpu]" + (into {} (map + (fn [[nodeInfo resources]] + [(concat [(.get_node nodeInfo)] (.get_port nodeInfo)) + [(.get_mem_on_heap resources) (.get_mem_off_heap resources) (.get_cpu resources)]]) + worker->resources))) + +(defn clojurify-assignment [^Assignment assignment] + (if assignment + (org.apache.storm.daemon.common.Assignment. + (.get_master_code_dir assignment) + (into {} (.get_node_host assignment)) + (clojurify-executor->node_port (into {} (.get_executor_node_port assignment))) + (map-key (fn [executor] (into [] executor)) + (into {} (.get_executor_start_time_secs assignment))) + (clojurify-worker->resources (into {} (.get_worker_resources assignment)))))) + +(defn convert-to-symbol-from-status [status] + (condp = status + TopologyStatus/ACTIVE {:type :active} + TopologyStatus/INACTIVE {:type :inactive} + TopologyStatus/REBALANCING {:type :rebalancing} + TopologyStatus/KILLED {:type :killed} + nil)) + +(defn- convert-to-status-from-symbol [status] + (if status + (condp = (:type status) + :active TopologyStatus/ACTIVE + :inactive TopologyStatus/INACTIVE + :rebalancing TopologyStatus/REBALANCING + :killed TopologyStatus/KILLED + nil))) + +(defn clojurify-rebalance-options [^RebalanceOptions rebalance-options] + (-> {:action :rebalance} + (assoc-non-nil :delay-secs (if (.is_set_wait_secs rebalance-options) (.get_wait_secs rebalance-options))) + (assoc-non-nil :num-workers (if (.is_set_num_workers rebalance-options) (.get_num_workers rebalance-options))) + (assoc-non-nil :component->executors (if (.is_set_num_executors rebalance-options) (into {} (.get_num_executors rebalance-options)))))) + +(defn thriftify-rebalance-options [rebalance-options] + (if rebalance-options + (let [thrift-rebalance-options (RebalanceOptions.)] + (if (:delay-secs rebalance-options) + (.set_wait_secs thrift-rebalance-options (int (:delay-secs rebalance-options)))) + (if (:num-workers rebalance-options) + (.set_num_workers thrift-rebalance-options (int (:num-workers rebalance-options)))) + (if (:component->executors rebalance-options) + (.set_num_executors thrift-rebalance-options (map-val int (:component->executors rebalance-options)))) + thrift-rebalance-options))) + +(defn clojurify-kill-options [^KillOptions kill-options] + (-> {:action :kill} + (assoc-non-nil :delay-secs (if (.is_set_wait_secs kill-options) (.get_wait_secs kill-options))))) + +(defn thriftify-kill-options [kill-options] + (if kill-options + (let [thrift-kill-options (KillOptions.)] + (if (:delay-secs kill-options) + (.set_wait_secs thrift-kill-options (int (:delay-secs kill-options)))) + thrift-kill-options))) + +(defn thriftify-topology-action-options [storm-base] + (if (:topology-action-options storm-base) + (let [ topology-action-options (:topology-action-options storm-base) + action (:action topology-action-options) + thrift-topology-action-options (TopologyActionOptions.)] + (if (= action :kill) + (.set_kill_options thrift-topology-action-options (thriftify-kill-options topology-action-options))) + (if (= action :rebalance) + (.set_rebalance_options thrift-topology-action-options (thriftify-rebalance-options topology-action-options))) + thrift-topology-action-options))) + +(defn clojurify-topology-action-options [^TopologyActionOptions topology-action-options] + (if topology-action-options + (or (and (.is_set_kill_options topology-action-options) + (clojurify-kill-options + (.get_kill_options topology-action-options))) + (and (.is_set_rebalance_options topology-action-options) + (clojurify-rebalance-options + (.get_rebalance_options topology-action-options)))))) + +(defn clojurify-debugoptions [^DebugOptions options] + (if options + { + :enable (.is_enable options) + :samplingpct (.get_samplingpct options) + } + )) + +(defn thriftify-debugoptions [options] + (doto (DebugOptions.) + (.set_enable (get options :enable false)) + (.set_samplingpct (get options :samplingpct 10)))) + +(defn thriftify-storm-base [storm-base] + (doto (StormBase.) + (.set_name (:storm-name storm-base)) + (.set_launch_time_secs (int (:launch-time-secs storm-base))) + (.set_status (convert-to-status-from-symbol (:status storm-base))) + (.set_num_workers (int (:num-workers storm-base))) + (.set_component_executors (map-val int (:component->executors storm-base))) + (.set_owner (:owner storm-base)) + (.set_topology_action_options (thriftify-topology-action-options storm-base)) + (.set_prev_status (convert-to-status-from-symbol (:prev-status storm-base))) + (.set_component_debug (map-val thriftify-debugoptions (:component->debug storm-base))))) + +(defn clojurify-storm-base [^StormBase storm-base] + (if storm-base + (org.apache.storm.daemon.common.StormBase. + (.get_name storm-base) + (.get_launch_time_secs storm-base) + (convert-to-symbol-from-status (.get_status storm-base)) + (.get_num_workers storm-base) + (into {} (.get_component_executors storm-base)) + (.get_owner storm-base) + (clojurify-topology-action-options (.get_topology_action_options storm-base)) + (convert-to-symbol-from-status (.get_prev_status storm-base)) + (map-val clojurify-debugoptions (.get_component_debug storm-base))))) + +(defn thriftify-stats [stats] + (if stats + (map-val thriftify-executor-stats + (map-key #(ExecutorInfo. (int (first %1)) (int (last %1))) + stats)) + {})) + +(defn clojurify-stats [stats] + (if stats + (map-val clojurify-executor-stats + (map-key (fn [x] (list (.get_task_start x) (.get_task_end x))) + stats)) + {})) + +(defn clojurify-zk-worker-hb [^ClusterWorkerHeartbeat worker-hb] + (if worker-hb + {:storm-id (.get_storm_id worker-hb) + :executor-stats (clojurify-stats (into {} (.get_executor_stats worker-hb))) + :uptime (.get_uptime_secs worker-hb) + :time-secs (.get_time_secs worker-hb) + } + {})) + +(defn thriftify-zk-worker-hb [worker-hb] + (if (not-empty (filter second (:executor-stats worker-hb))) + (doto (ClusterWorkerHeartbeat.) + (.set_uptime_secs (:uptime worker-hb)) + (.set_storm_id (:storm-id worker-hb)) + (.set_executor_stats (thriftify-stats (filter second (:executor-stats worker-hb)))) + (.set_time_secs (:time-secs worker-hb))))) + +(defn clojurify-error [^ErrorInfo error] + (if error + { + :error (.get_error error) + :time-secs (.get_error_time_secs error) + :host (.get_host error) + :port (.get_port error) + } + )) + +(defn thriftify-error [error] + (doto (ErrorInfo. (:error error) (:time-secs error)) + (.set_host (:host error)) + (.set_port (:port error)))) + +(defn clojurify-profile-request + [^ProfileRequest request] + (when request + {:host (.get_node (.get_nodeInfo request)) + :port (first (.get_port (.get_nodeInfo request))) + :action (.get_action request) + :timestamp (.get_time_stamp request)})) + +(defn thriftify-profile-request + [profile-request] + (let [nodeinfo (doto (NodeInfo.) + (.set_node (:host profile-request)) + (.set_port (set [(:port profile-request)]))) + request (ProfileRequest. nodeinfo (:action profile-request))] + (.set_time_stamp request (:timestamp profile-request)) + request)) + +(defn thriftify-credentials [credentials] + (doto (Credentials.) + (.set_creds (if credentials credentials {})))) + +(defn clojurify-crdentials [^Credentials credentials] + (if credentials + (into {} (.get_creds credentials)) + nil + )) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/daemon/acker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj b/storm-core/src/clj/org/apache/storm/daemon/acker.clj new file mode 100644 index 0000000..7c4d614 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/daemon/acker.clj @@ -0,0 +1,107 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.daemon.acker + (:import [org.apache.storm.task OutputCollector TopologyContext IBolt]) + (:import [org.apache.storm.tuple Tuple Fields]) + (:import [org.apache.storm.utils RotatingMap MutableObject]) + (:import [java.util List Map]) + (:import [org.apache.storm Constants]) + (:use [org.apache.storm config util log]) + (:gen-class + :init init + :implements [org.apache.storm.task.IBolt] + :constructors {[] []} + :state state )) + +(def ACKER-COMPONENT-ID "__acker") +(def ACKER-INIT-STREAM-ID "__ack_init") +(def ACKER-ACK-STREAM-ID "__ack_ack") +(def ACKER-FAIL-STREAM-ID "__ack_fail") + +(defn- update-ack [curr-entry val] + (let [old (get curr-entry :val 0)] + (assoc curr-entry :val (bit-xor old val)) + )) + +(defn- acker-emit-direct [^OutputCollector collector ^Integer task ^String stream ^List values] + (.emitDirect collector task stream values) + ) + +(defn mk-acker-bolt [] + (let [output-collector (MutableObject.) + pending (MutableObject.)] + (reify IBolt + (^void prepare [this ^Map storm-conf ^TopologyContext context ^OutputCollector collector] + (.setObject output-collector collector) + (.setObject pending (RotatingMap. 2)) + ) + (^void execute [this ^Tuple tuple] + (let [^RotatingMap pending (.getObject pending) + stream-id (.getSourceStreamId tuple)] + (if (= stream-id Constants/SYSTEM_TICK_STREAM_ID) + (.rotate pending) + (let [id (.getValue tuple 0) + ^OutputCollector output-collector (.getObject output-collector) + curr (.get pending id) + curr (condp = stream-id + ACKER-INIT-STREAM-ID (-> curr + (update-ack (.getValue tuple 1)) + (assoc :spout-task (.getValue tuple 2))) + ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1)) + ACKER-FAIL-STREAM-ID (assoc curr :failed true))] + (.put pending id curr) + (when (and curr (:spout-task curr)) + (cond (= 0 (:val curr)) + (do + (.remove pending id) + (acker-emit-direct output-collector + (:spout-task curr) + ACKER-ACK-STREAM-ID + [id] + )) + (:failed curr) + (do + (.remove pending id) + (acker-emit-direct output-collector + (:spout-task curr) + ACKER-FAIL-STREAM-ID + [id] + )) + )) + (.ack output-collector tuple) + )))) + (^void cleanup [this] + ) + ))) + +(defn -init [] + [[] (container)]) + +(defn -prepare [this conf context collector] + (let [^IBolt ret (mk-acker-bolt)] + (container-set! (.state ^org.apache.storm.daemon.acker this) ret) + (.prepare ret conf context collector) + )) + +(defn -execute [this tuple] + (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker this))] + (.execute delegate tuple) + )) + +(defn -cleanup [this] + (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker this))] + (.cleanup delegate) + )) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj b/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj new file mode 100644 index 0000000..14d0132 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj @@ -0,0 +1,98 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.daemon.builtin-metrics + (:import [org.apache.storm.metric.api CountMetric StateMetric IMetric IStatefulObject]) + (:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric]) + (:import [org.apache.storm Config]) + (:use [org.apache.storm.stats])) + +(defrecord BuiltinSpoutMetrics [^MultiCountStatAndMetric ack-count + ^MultiLatencyStatAndMetric complete-latency + ^MultiCountStatAndMetric fail-count + ^MultiCountStatAndMetric emit-count + ^MultiCountStatAndMetric transfer-count]) +(defrecord BuiltinBoltMetrics [^MultiCountStatAndMetric ack-count + ^MultiLatencyStatAndMetric process-latency + ^MultiCountStatAndMetric fail-count + ^MultiCountStatAndMetric execute-count + ^MultiLatencyStatAndMetric execute-latency + ^MultiCountStatAndMetric emit-count + ^MultiCountStatAndMetric transfer-count]) +(defrecord SpoutThrottlingMetrics [^CountMetric skipped-max-spout + ^CountMetric skipped-throttle + ^CountMetric skipped-inactive]) + + +(defn make-data [executor-type stats] + (condp = executor-type + :spout (BuiltinSpoutMetrics. (stats-acked stats) + (stats-complete-latencies stats) + (stats-failed stats) + (stats-emitted stats) + (stats-transferred stats)) + :bolt (BuiltinBoltMetrics. (stats-acked stats) + (stats-process-latencies stats) + (stats-failed stats) + (stats-executed stats) + (stats-execute-latencies stats) + (stats-emitted stats) + (stats-transferred stats)))) + +(defn make-spout-throttling-data [] + (SpoutThrottlingMetrics. (CountMetric.) + (CountMetric.) + (CountMetric.))) + +(defn register-spout-throttling-metrics [throttling-metrics storm-conf topology-context] + (doseq [[kw imetric] throttling-metrics] + (.registerMetric topology-context (str "__" (name kw)) imetric + (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS))))) + +(defn register-all [builtin-metrics storm-conf topology-context] + (doseq [[kw imetric] builtin-metrics] + (.registerMetric topology-context (str "__" (name kw)) imetric + (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS))))) + +(defn register-iconnection-server-metric [server storm-conf topology-context] + (if (instance? IStatefulObject server) + (.registerMetric topology-context "__recv-iconnection" (StateMetric. server) + (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS))))) + +(defn register-iconnection-client-metrics [node+port->socket-ref storm-conf topology-context] + (.registerMetric topology-context "__send-iconnection" + (reify IMetric + (^Object getValueAndReset [this] + (into {} + (map + (fn [[node+port ^IStatefulObject connection]] [node+port (.getState connection)]) + (filter + (fn [[node+port connection]] (instance? IStatefulObject connection)) + @node+port->socket-ref))))) + (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))) + +(defn register-queue-metrics [queues storm-conf topology-context] + (doseq [[qname q] queues] + (.registerMetric topology-context (str "__" (name qname)) (StateMetric. q) + (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS))))) + +(defn skipped-max-spout! [^SpoutThrottlingMetrics m stats] + (-> m .skipped-max-spout (.incrBy (stats-rate stats)))) + +(defn skipped-throttle! [^SpoutThrottlingMetrics m stats] + (-> m .skipped-throttle (.incrBy (stats-rate stats)))) + +(defn skipped-inactive! [^SpoutThrottlingMetrics m stats] + (-> m .skipped-inactive (.incrBy (stats-rate stats)))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/daemon/common.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj new file mode 100644 index 0000000..dd761a5 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj @@ -0,0 +1,402 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.daemon.common + (:use [org.apache.storm log config util]) + (:import [org.apache.storm.generated StormTopology + InvalidTopologyException GlobalStreamId] + [org.apache.storm.utils ThriftTopologyUtils]) + (:import [org.apache.storm.utils Utils]) + (:import [org.apache.storm.task WorkerTopologyContext]) + (:import [org.apache.storm Constants]) + (:import [org.apache.storm.metric SystemBolt]) + (:import [org.apache.storm.metric EventLoggerBolt]) + (:import [org.apache.storm.security.auth IAuthorizer]) + (:import [java.io InterruptedIOException]) + (:require [clojure.set :as set]) + (:require [org.apache.storm.daemon.acker :as acker]) + (:require [org.apache.storm.thrift :as thrift]) + (:require [metrics.reporters.jmx :as jmx])) + +(defn start-metrics-reporters [] + (jmx/start (jmx/reporter {}))) + +(def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID) +(def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID) +(def ACKER-ACK-STREAM-ID acker/ACKER-ACK-STREAM-ID) +(def ACKER-FAIL-STREAM-ID acker/ACKER-FAIL-STREAM-ID) + +(def SYSTEM-STREAM-ID "__system") + +(def EVENTLOGGER-COMPONENT-ID "__eventlogger") +(def EVENTLOGGER-STREAM-ID "__eventlog") + +(def SYSTEM-COMPONENT-ID Constants/SYSTEM_COMPONENT_ID) +(def SYSTEM-TICK-STREAM-ID Constants/SYSTEM_TICK_STREAM_ID) +(def METRICS-STREAM-ID Constants/METRICS_STREAM_ID) +(def METRICS-TICK-STREAM-ID Constants/METRICS_TICK_STREAM_ID) +(def CREDENTIALS-CHANGED-STREAM-ID Constants/CREDENTIALS_CHANGED_STREAM_ID) + +;; the task id is the virtual port +;; node->host is here so that tasks know who to talk to just from assignment +;; this avoid situation where node goes down and task doesn't know what to do information-wise +(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs worker->resources]) + + +;; component->executors is a map from spout/bolt id to number of executors for that component +(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner topology-action-options prev-status component->debug]) + +(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version resources-map]) + +(defprotocol DaemonCommon + (waiting? [this])) + +(defrecord ExecutorStats [^long processed + ^long acked + ^long emitted + ^long transferred + ^long failed]) + +(defn new-executor-stats [] + (ExecutorStats. 0 0 0 0 0)) + +(defn get-storm-id [storm-cluster-state storm-name] + (let [active-storms (.active-storms storm-cluster-state)] + (find-first + #(= storm-name (:storm-name (.storm-base storm-cluster-state % nil))) + active-storms) + )) + +(defn topology-bases [storm-cluster-state] + (let [active-topologies (.active-storms storm-cluster-state)] + (into {} + (dofor [id active-topologies] + [id (.storm-base storm-cluster-state id nil)] + )) + )) + +(defn validate-distributed-mode! [conf] + (if (local-mode? conf) + (throw + (IllegalArgumentException. "Cannot start server in local mode!")))) + +(defmacro defserverfn [name & body] + `(let [exec-fn# (fn ~@body)] + (defn ~name [& args#] + (try-cause + (apply exec-fn# args#) + (catch InterruptedIOException e# + (throw e#)) + (catch InterruptedException e# + (throw e#)) + (catch Throwable t# + (log-error t# "Error on initialization of server " ~(str name)) + (exit-process! 13 "Error on initialization") + ))))) + +(defn- validate-ids! [^StormTopology topology] + (let [sets (map #(.getFieldValue topology %) thrift/STORM-TOPOLOGY-FIELDS) + offending (apply any-intersection sets)] + (if-not (empty? offending) + (throw (InvalidTopologyException. + (str "Duplicate component ids: " offending)))) + (doseq [f thrift/STORM-TOPOLOGY-FIELDS + :let [obj-map (.getFieldValue topology f)]] + (if-not (ThriftTopologyUtils/isWorkerHook f) + (do + (doseq [id (keys obj-map)] + (if (Utils/isSystemId id) + (throw (InvalidTopologyException. + (str id " is not a valid component id"))))) + (doseq [obj (vals obj-map) + id (-> obj .get_common .get_streams keys)] + (if (Utils/isSystemId id) + (throw (InvalidTopologyException. + (str id " is not a valid stream id")))))))))) + +(defn all-components [^StormTopology topology] + (apply merge {} + (for [f thrift/STORM-TOPOLOGY-FIELDS] + (if-not (ThriftTopologyUtils/isWorkerHook f) + (.getFieldValue topology f))))) + +(defn component-conf [component] + (->> component + .get_common + .get_json_conf + from-json)) + +(defn validate-basic! [^StormTopology topology] + (validate-ids! topology) + (doseq [f thrift/SPOUT-FIELDS + obj (->> f (.getFieldValue topology) vals)] + (if-not (empty? (-> obj .get_common .get_inputs)) + (throw (InvalidTopologyException. "May not declare inputs for a spout")))) + (doseq [[comp-id comp] (all-components topology) + :let [conf (component-conf comp) + p (-> comp .get_common thrift/parallelism-hint)]] + (when (and (> (conf TOPOLOGY-TASKS) 0) + p + (<= p 0)) + (throw (InvalidTopologyException. "Number of executors must be greater than 0 when number of tasks is greater than 0")) + ))) + +(defn validate-structure! [^StormTopology topology] + ;; validate all the component subscribe from component+stream which actually exists in the topology + ;; and if it is a fields grouping, validate the corresponding field exists + (let [all-components (all-components topology)] + (doseq [[id comp] all-components + :let [inputs (.. comp get_common get_inputs)]] + (doseq [[global-stream-id grouping] inputs + :let [source-component-id (.get_componentId global-stream-id) + source-stream-id (.get_streamId global-stream-id)]] + (if-not (contains? all-components source-component-id) + (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from non-existent component [" source-component-id "]"))) + (let [source-streams (-> all-components (get source-component-id) .get_common .get_streams)] + (if-not (contains? source-streams source-stream-id) + (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from non-existent stream: [" source-stream-id "] of component [" source-component-id "]"))) + (if (= :fields (thrift/grouping-type grouping)) + (let [grouping-fields (set (.get_fields grouping)) + source-stream-fields (-> source-streams (get source-stream-id) .get_output_fields set) + diff-fields (set/difference grouping-fields source-stream-fields)] + (when-not (empty? diff-fields) + (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from stream: [" source-stream-id "] of component [" source-component-id "] with non-existent fields: " diff-fields))))))))))))) + +(defn acker-inputs [^StormTopology topology] + (let [bolt-ids (.. topology get_bolts keySet) + spout-ids (.. topology get_spouts keySet) + spout-inputs (apply merge + (for [id spout-ids] + {[id ACKER-INIT-STREAM-ID] ["id"]} + )) + bolt-inputs (apply merge + (for [id bolt-ids] + {[id ACKER-ACK-STREAM-ID] ["id"] + [id ACKER-FAIL-STREAM-ID] ["id"]} + ))] + (merge spout-inputs bolt-inputs))) + +;; the event logger receives inputs from all the spouts and bolts +;; with a field grouping on component id so that all tuples from a component +;; goes to same executor and can be viewed via logviewer. +(defn eventlogger-inputs [^StormTopology topology] + (let [bolt-ids (.. topology get_bolts keySet) + spout-ids (.. topology get_spouts keySet) + spout-inputs (apply merge + (for [id spout-ids] + {[id EVENTLOGGER-STREAM-ID] ["component-id"]} + )) + bolt-inputs (apply merge + (for [id bolt-ids] + {[id EVENTLOGGER-STREAM-ID] ["component-id"]} + ))] + (merge spout-inputs bolt-inputs))) + +(defn add-acker! [storm-conf ^StormTopology ret] + (let [num-executors (if (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-ACKER-EXECUTORS)) + acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret) + (new org.apache.storm.daemon.acker) + {ACKER-ACK-STREAM-ID (thrift/direct-output-fields ["id"]) + ACKER-FAIL-STREAM-ID (thrift/direct-output-fields ["id"]) + } + :p num-executors + :conf {TOPOLOGY-TASKS num-executors + TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})] + (dofor [[_ bolt] (.get_bolts ret) + :let [common (.get_common bolt)]] + (do + (.put_to_streams common ACKER-ACK-STREAM-ID (thrift/output-fields ["id" "ack-val"])) + (.put_to_streams common ACKER-FAIL-STREAM-ID (thrift/output-fields ["id"])) + )) + (dofor [[_ spout] (.get_spouts ret) + :let [common (.get_common spout) + spout-conf (merge + (component-conf spout) + {TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]] + (do + ;; this set up tick tuples to cause timeouts to be triggered + (.set_json_conf common (to-json spout-conf)) + (.put_to_streams common ACKER-INIT-STREAM-ID (thrift/output-fields ["id" "init-val" "spout-task"])) + (.put_to_inputs common + (GlobalStreamId. ACKER-COMPONENT-ID ACKER-ACK-STREAM-ID) + (thrift/mk-direct-grouping)) + (.put_to_inputs common + (GlobalStreamId. ACKER-COMPONENT-ID ACKER-FAIL-STREAM-ID) + (thrift/mk-direct-grouping)) + )) + (.put_to_bolts ret "__acker" acker-bolt) + )) + +(defn add-metric-streams! [^StormTopology topology] + (doseq [[_ component] (all-components topology) + :let [common (.get_common component)]] + (.put_to_streams common METRICS-STREAM-ID + (thrift/output-fields ["task-info" "data-points"])))) + +(defn add-system-streams! [^StormTopology topology] + (doseq [[_ component] (all-components topology) + :let [common (.get_common component)]] + (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"])))) + + +(defn map-occurrences [afn coll] + (->> coll + (reduce (fn [[counts new-coll] x] + (let [occurs (inc (get counts x 0))] + [(assoc counts x occurs) (cons (afn x occurs) new-coll)])) + [{} []]) + (second) + (reverse))) + +(defn number-duplicates + "(number-duplicates [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]" + [coll] + (map-occurrences (fn [x occurences] (if (>= occurences 2) (str x "#" occurences) x)) coll)) + +(defn metrics-consumer-register-ids + "Generates a list of component ids for each metrics consumer + e.g. [\"__metrics_org.mycompany.MyMetricsConsumer\", ..] " + [storm-conf] + (->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER) + (map #(get % "class")) + (number-duplicates) + (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %)))) + +(defn metrics-consumer-bolt-specs [storm-conf topology] + (let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology))) + inputs (->> (for [comp-id component-ids-that-emit-metrics] + {[comp-id METRICS-STREAM-ID] :shuffle}) + (into {})) + + mk-bolt-spec (fn [class arg p] + (thrift/mk-bolt-spec* + inputs + (org.apache.storm.metric.MetricsConsumerBolt. class arg) + {} :p p :conf {TOPOLOGY-TASKS p}))] + + (map + (fn [component-id register] + [component-id (mk-bolt-spec (get register "class") + (get register "argument") + (or (get register "parallelism.hint") 1))]) + + (metrics-consumer-register-ids storm-conf) + (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)))) + +;; return the fields that event logger bolt expects +(defn eventlogger-bolt-fields [] + [(EventLoggerBolt/FIELD_COMPONENT_ID) (EventLoggerBolt/FIELD_MESSAGE_ID) (EventLoggerBolt/FIELD_TS) (EventLoggerBolt/FIELD_VALUES)] + ) + +(defn add-eventlogger! [storm-conf ^StormTopology ret] + (let [num-executors (if (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) + eventlogger-bolt (thrift/mk-bolt-spec* (eventlogger-inputs ret) + (EventLoggerBolt.) + {} + :p num-executors + :conf {TOPOLOGY-TASKS num-executors + TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})] + + (doseq [[_ component] (all-components ret) + :let [common (.get_common component)]] + (.put_to_streams common EVENTLOGGER-STREAM-ID (thrift/output-fields (eventlogger-bolt-fields)))) + (.put_to_bolts ret EVENTLOGGER-COMPONENT-ID eventlogger-bolt) + )) + +(defn add-metric-components! [storm-conf ^StormTopology topology] + (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)] + (.put_to_bolts topology comp-id bolt-spec))) + +(defn add-system-components! [conf ^StormTopology topology] + (let [system-bolt-spec (thrift/mk-bolt-spec* + {} + (SystemBolt.) + {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"]) + METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"]) + CREDENTIALS-CHANGED-STREAM-ID (thrift/output-fields ["creds"])} + :p 0 + :conf {TOPOLOGY-TASKS 0})] + (.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec))) + +(defn system-topology! [storm-conf ^StormTopology topology] + (validate-basic! topology) + (let [ret (.deepCopy topology)] + (add-acker! storm-conf ret) + (add-eventlogger! storm-conf ret) + (add-metric-components! storm-conf ret) + (add-system-components! storm-conf ret) + (add-metric-streams! ret) + (add-system-streams! ret) + (validate-structure! ret) + ret + )) + +(defn has-ackers? [storm-conf] + (or (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (> (storm-conf TOPOLOGY-ACKER-EXECUTORS) 0))) + +(defn has-eventloggers? [storm-conf] + (or (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) (> (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS) 0))) + +(defn num-start-executors [component] + (thrift/parallelism-hint (.get_common component))) + +(defn storm-task-info + "Returns map from task -> component id" + [^StormTopology user-topology storm-conf] + (->> (system-topology! storm-conf user-topology) + all-components + (map-val (comp #(get % TOPOLOGY-TASKS) component-conf)) + (sort-by first) + (mapcat (fn [[c num-tasks]] (repeat num-tasks c))) + (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1))) + (into {}) + )) + +(defn executor-id->tasks [[first-task-id last-task-id]] + (->> (range first-task-id (inc last-task-id)) + (map int))) + +(defn worker-context [worker] + (WorkerTopologyContext. (:system-topology worker) + (:storm-conf worker) + (:task->component worker) + (:component->sorted-tasks worker) + (:component->stream->fields worker) + (:storm-id worker) + (supervisor-storm-resources-path + (supervisor-stormdist-root (:conf worker) (:storm-id worker))) + (worker-pids-root (:conf worker) (:worker-id worker)) + (:port worker) + (:task-ids worker) + (:default-shared-resources worker) + (:user-shared-resources worker) + )) + + +(defn to-task->node+port [executor->node+port] + (->> executor->node+port + (mapcat (fn [[e node+port]] (for [t (executor-id->tasks e)] [t node+port]))) + (into {}))) + +(defn mk-authorization-handler [klassname conf] + (let [aznClass (if klassname (Class/forName klassname)) + aznHandler (if aznClass (.newInstance aznClass))] + (if aznHandler (.prepare ^IAuthorizer aznHandler conf)) + (log-debug "authorization class name:" klassname + " class:" aznClass + " handler:" aznHandler) + aznHandler + )) + http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/daemon/drpc.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj new file mode 100644 index 0000000..d6f77c3 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj @@ -0,0 +1,274 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. + +(ns org.apache.storm.daemon.drpc + (:import [org.apache.storm.security.auth AuthUtils ThriftServer ThriftConnectionType ReqContext]) + (:import [org.apache.storm.security.auth.authorizer DRPCAuthorizerBase]) + (:import [org.apache.storm.generated DistributedRPC DistributedRPC$Iface DistributedRPC$Processor + DRPCRequest DRPCExecutionException DistributedRPCInvocations DistributedRPCInvocations$Iface + DistributedRPCInvocations$Processor]) + (:import [java.util.concurrent Semaphore ConcurrentLinkedQueue + ThreadPoolExecutor ArrayBlockingQueue TimeUnit]) + (:import [org.apache.storm.daemon Shutdownable]) + (:import [java.net InetAddress]) + (:import [org.apache.storm.generated AuthorizationException] + [org.apache.storm.utils VersionInfo]) + (:use [org.apache.storm config log util]) + (:use [org.apache.storm.daemon common]) + (:use [org.apache.storm.ui helpers]) + (:use compojure.core) + (:use ring.middleware.reload) + (:require [compojure.handler :as handler]) + (:require [metrics.meters :refer [defmeter mark!]]) + (:gen-class)) + +(defmeter drpc:num-execute-http-requests) +(defmeter drpc:num-execute-calls) +(defmeter drpc:num-result-calls) +(defmeter drpc:num-failRequest-calls) +(defmeter drpc:num-fetchRequest-calls) +(defmeter drpc:num-shutdown-calls) + +(def STORM-VERSION (VersionInfo/getVersion)) + +(defn timeout-check-secs [] 5) + +(defn acquire-queue [queues-atom function] + (swap! queues-atom + (fn [amap] + (if-not (amap function) + (assoc amap function (ConcurrentLinkedQueue.)) + amap))) + (@queues-atom function)) + +(defn check-authorization + ([aclHandler mapping operation context] + (if (not-nil? context) + (log-thrift-access (.requestID context) (.remoteAddress context) (.principal context) operation)) + (if aclHandler + (let [context (or context (ReqContext/context))] + (if-not (.permit aclHandler context operation mapping) + (let [principal (.principal context) + user (if principal (.getName principal) "unknown")] + (throw (AuthorizationException. + (str "DRPC request '" operation "' for '" + user "' user is not authorized")))))))) + ([aclHandler mapping operation] + (check-authorization aclHandler mapping operation (ReqContext/context)))) + +;; TODO: change this to use TimeCacheMap +(defn service-handler [conf] + (let [drpc-acl-handler (mk-authorization-handler (conf DRPC-AUTHORIZER) conf) + ctr (atom 0) + id->sem (atom {}) + id->result (atom {}) + id->start (atom {}) + id->function (atom {}) + id->request (atom {}) + request-queues (atom {}) + cleanup (fn [id] (swap! id->sem dissoc id) + (swap! id->result dissoc id) + (swap! id->function dissoc id) + (swap! id->request dissoc id) + (swap! id->start dissoc id)) + my-ip (.getHostAddress (InetAddress/getLocalHost)) + clear-thread (async-loop + (fn [] + (doseq [[id start] @id->start] + (when (> (time-delta start) (conf DRPC-REQUEST-TIMEOUT-SECS)) + (when-let [sem (@id->sem id)] + (.remove (acquire-queue request-queues (@id->function id)) (@id->request id)) + (log-warn "Timeout DRPC request id: " id " start at " start) + (.release sem)) + (cleanup id))) + (timeout-check-secs)))] + (reify DistributedRPC$Iface + (^String execute + [this ^String function ^String args] + (mark! drpc:num-execute-calls) + (log-debug "Received DRPC request for " function " (" args ") at " (System/currentTimeMillis)) + (check-authorization drpc-acl-handler + {DRPCAuthorizerBase/FUNCTION_NAME function} + "execute") + (let [id (str (swap! ctr (fn [v] (mod (inc v) 1000000000)))) + ^Semaphore sem (Semaphore. 0) + req (DRPCRequest. args id) + ^ConcurrentLinkedQueue queue (acquire-queue request-queues function)] + (swap! id->start assoc id (current-time-secs)) + (swap! id->sem assoc id sem) + (swap! id->function assoc id function) + (swap! id->request assoc id req) + (.add queue req) + (log-debug "Waiting for DRPC result for " function " " args " at " (System/currentTimeMillis)) + (.acquire sem) + (log-debug "Acquired DRPC result for " function " " args " at " (System/currentTimeMillis)) + (let [result (@id->result id)] + (cleanup id) + (log-debug "Returning DRPC result for " function " " args " at " (System/currentTimeMillis)) + (if (instance? DRPCExecutionException result) + (throw result) + (if (nil? result) + (throw (DRPCExecutionException. "Request timed out")) + result))))) + + DistributedRPCInvocations$Iface + + (^void result + [this ^String id ^String result] + (mark! drpc:num-result-calls) + (when-let [func (@id->function id)] + (check-authorization drpc-acl-handler + {DRPCAuthorizerBase/FUNCTION_NAME func} + "result") + (let [^Semaphore sem (@id->sem id)] + (log-debug "Received result " result " for " id " at " (System/currentTimeMillis)) + (when sem + (swap! id->result assoc id result) + (.release sem) + )))) + + (^void failRequest + [this ^String id] + (mark! drpc:num-failRequest-calls) + (when-let [func (@id->function id)] + (check-authorization drpc-acl-handler + {DRPCAuthorizerBase/FUNCTION_NAME func} + "failRequest") + (let [^Semaphore sem (@id->sem id)] + (when sem + (swap! id->result assoc id (DRPCExecutionException. "Request failed")) + (.release sem))))) + + (^DRPCRequest fetchRequest + [this ^String func] + (mark! drpc:num-fetchRequest-calls) + (check-authorization drpc-acl-handler + {DRPCAuthorizerBase/FUNCTION_NAME func} + "fetchRequest") + (let [^ConcurrentLinkedQueue queue (acquire-queue request-queues func) + ret (.poll queue)] + (if ret + (do (log-debug "Fetched request for " func " at " (System/currentTimeMillis)) + ret) + (DRPCRequest. "" "")))) + + Shutdownable + + (shutdown + [this] + (mark! drpc:num-shutdown-calls) + (.interrupt clear-thread))))) + +(defn handle-request [handler] + (fn [request] + (handler request))) + +(defn populate-context! + "Populate the Storm RequestContext from an servlet-request. This should be called in each handler" + [http-creds-handler servlet-request] + (when http-creds-handler + (.populateContext http-creds-handler (ReqContext/context) servlet-request))) + +(defn webapp [handler http-creds-handler] + (mark! drpc:num-execute-http-requests) + (-> + (routes + (POST "/drpc/:func" [:as {:keys [body servlet-request]} func & m] + (let [args (slurp body)] + (populate-context! http-creds-handler servlet-request) + (.execute handler func args))) + (POST "/drpc/:func/" [:as {:keys [body servlet-request]} func & m] + (let [args (slurp body)] + (populate-context! http-creds-handler servlet-request) + (.execute handler func args))) + (GET "/drpc/:func/:args" [:as {:keys [servlet-request]} func args & m] + (populate-context! http-creds-handler servlet-request) + (.execute handler func args)) + (GET "/drpc/:func/" [:as {:keys [servlet-request]} func & m] + (populate-context! http-creds-handler servlet-request) + (.execute handler func "")) + (GET "/drpc/:func" [:as {:keys [servlet-request]} func & m] + (populate-context! http-creds-handler servlet-request) + (.execute handler func ""))) + (wrap-reload '[org.apache.storm.daemon.drpc]) + handle-request)) + +(defn launch-server! + ([] + (log-message "Starting drpc server for storm version '" STORM-VERSION "'") + (let [conf (read-storm-config) + worker-threads (int (conf DRPC-WORKER-THREADS)) + queue-size (int (conf DRPC-QUEUE-SIZE)) + drpc-http-port (int (conf DRPC-HTTP-PORT)) + drpc-port (int (conf DRPC-PORT)) + drpc-service-handler (service-handler conf) + ;; requests and returns need to be on separate thread pools, since calls to + ;; "execute" don't unblock until other thrift methods are called. So if + ;; 64 threads are calling execute, the server won't accept the result + ;; invocations that will unblock those threads + handler-server (when (> drpc-port 0) + (ThriftServer. conf + (DistributedRPC$Processor. drpc-service-handler) + ThriftConnectionType/DRPC)) + invoke-server (ThriftServer. conf + (DistributedRPCInvocations$Processor. drpc-service-handler) + ThriftConnectionType/DRPC_INVOCATIONS) + http-creds-handler (AuthUtils/GetDrpcHttpCredentialsPlugin conf)] + (add-shutdown-hook-with-force-kill-in-1-sec (fn [] + (if handler-server (.stop handler-server)) + (.stop invoke-server))) + (log-message "Starting Distributed RPC servers...") + (future (.serve invoke-server)) + (when (> drpc-http-port 0) + (let [app (-> (webapp drpc-service-handler http-creds-handler) + requests-middleware) + filter-class (conf DRPC-HTTP-FILTER) + filter-params (conf DRPC-HTTP-FILTER-PARAMS) + filters-confs [{:filter-class filter-class + :filter-params filter-params}] + https-port (int (conf DRPC-HTTPS-PORT)) + https-ks-path (conf DRPC-HTTPS-KEYSTORE-PATH) + https-ks-password (conf DRPC-HTTPS-KEYSTORE-PASSWORD) + https-ks-type (conf DRPC-HTTPS-KEYSTORE-TYPE) + https-key-password (conf DRPC-HTTPS-KEY-PASSWORD) + https-ts-path (conf DRPC-HTTPS-TRUSTSTORE-PATH) + https-ts-password (conf DRPC-HTTPS-TRUSTSTORE-PASSWORD) + https-ts-type (conf DRPC-HTTPS-TRUSTSTORE-TYPE) + https-want-client-auth (conf DRPC-HTTPS-WANT-CLIENT-AUTH) + https-need-client-auth (conf DRPC-HTTPS-NEED-CLIENT-AUTH)] + + (storm-run-jetty + {:port drpc-http-port + :configurator (fn [server] + (config-ssl server + https-port + https-ks-path + https-ks-password + https-ks-type + https-key-password + https-ts-path + https-ts-password + https-ts-type + https-need-client-auth + https-want-client-auth) + (config-filter server app filters-confs))}))) + (start-metrics-reporters) + (when handler-server + (.serve handler-server))))) + +(defn -main [] + (setup-default-uncaught-exception-handler) + (launch-server!))
