http://git-wip-us.apache.org/repos/asf/storm/blob/622b6d13/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 0786824..2c78ee9 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -52,12 +52,16 @@ (:require [org.apache.storm [cluster :as cluster] [converter :as converter] [stats :as stats]]) + (:require [org.apache.storm.ui.core :as ui]) (:require [clojure.set :as set]) (:import [org.apache.storm.daemon.common StormBase Assignment]) (:use [org.apache.storm.daemon common]) (:use [org.apache.storm config]) (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms]) - (:import [org.apache.storm.utils VersionInfo]) + (:import [org.apache.storm.utils VersionInfo] + (org.apache.storm.metric ClusterMetricsConsumerExecutor) + (org.apache.storm.metric.api IClusterMetricsConsumer$ClusterInfo DataPoint IClusterMetricsConsumer$SupervisorInfo) + (org.apache.storm Config)) (:require [clj-time.core :as time]) (:require [clj-time.coerce :as coerce]) (:require [metrics.meters :refer [defmeter mark!]]) @@ -164,6 +168,13 @@ (catch Exception e (log-warn-error e "Ingoring exception, Could not initialize " (conf NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN))))))) +(defn mk-cluster-metrics-consumer-executors [storm-conf] + (map + (fn [consumer] + (ClusterMetricsConsumerExecutor. (get consumer "class") + (get consumer "argument"))) + (get storm-conf STORM-CLUSTER-METRICS-CONSUMER-REGISTER))) + (defn nimbus-data [conf inimbus] (let [forced-scheduler (.getForcedScheduler inimbus)] {:conf conf @@ -203,6 +214,7 @@ :topo-history-state (nimbus-topo-history-state conf) :nimbus-autocred-plugins (AuthUtils/getNimbusAutoCredPlugins conf) :nimbus-topology-action-notifier (create-tology-action-notifier conf) + :cluster-consumer-executors (mk-cluster-metrics-consumer-executors conf) })) (defn inbox [nimbus] @@ -1355,12 +1367,43 @@ (defmethod blob-sync :local [conf nimbus] nil) -(defserverfn service-handler [conf inimbus] - (.prepare inimbus conf (master-inimbus-dir conf)) - (log-message "Starting Nimbus with conf " conf) - (let [nimbus (nimbus-data conf inimbus) - blob-store (:blob-store nimbus) - principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf) +(defn extract-cluster-metrics [^ClusterSummary summ] + (let [cluster-summ (ui/cluster-summary summ "nimbus")] + {:cluster-info (IClusterMetricsConsumer$ClusterInfo. (System/currentTimeMillis)) + :data-points (map + (fn [[k v]] (DataPoint. k v)) + (select-keys cluster-summ ["supervisors" "topologies" "slotsTotal" "slotsUsed" "slotsFree" + "executorsTotal" "tasksTotal"]))})) + +(defn extract-supervisors-metrics [^ClusterSummary summ] + (let [sups (.get_supervisors summ) + supervisors-summ ((ui/supervisor-summary sups) "supervisors")] + (map (fn [supervisor-summ] + {:supervisor-info (IClusterMetricsConsumer$SupervisorInfo. + (supervisor-summ "host") + (supervisor-summ "id") + (System/currentTimeMillis)) + :data-points (map + (fn [[k v]] (DataPoint. k v)) + (select-keys supervisor-summ ["slotsTotal" "slotsUsed" "totalMem" "totalCpu" + "usedMem" "usedCpu"]))}) + supervisors-summ))) + +(defn send-cluster-metrics-to-executors [nimbus-service nimbus] + (let [cluster-summary (.getClusterInfo nimbus-service) + cluster-metrics (extract-cluster-metrics cluster-summary) + supervisors-metrics (extract-supervisors-metrics cluster-summary)] + (dofor + [consumer-executor (:cluster-consumer-executors nimbus)] + (do + (.handleDataPoints consumer-executor (:cluster-info cluster-metrics) (:data-points cluster-metrics)) + (dofor + [supervisor-metrics supervisors-metrics] + (do + (.handleDataPoints consumer-executor (:supervisor-info supervisor-metrics) (:data-points supervisor-metrics)))))))) + +(defn mk-reified-nimbus [nimbus conf blob-store] + (let [principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf) admin-users (or (.get conf NIMBUS-ADMINS) []) get-common-topo-info (fn [^String storm-id operation] @@ -1397,6 +1440,746 @@ (doto (ErrorInfo. (:error e) (:time-secs e)) (.set_host (:host e)) (.set_port (:port e)))))] + (reify Nimbus$Iface + (^void submitTopologyWithOpts + [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology + ^SubmitOptions submitOptions] + (try + (mark! nimbus:num-submitTopologyWithOpts-calls) + (is-leader nimbus) + (assert (not-nil? submitOptions)) + (validate-topology-name! storm-name) + (check-authorization! nimbus storm-name nil "submitTopology") + (check-storm-active! nimbus storm-name false) + (let [topo-conf (from-json serializedConf)] + (try + (validate-configs-with-schemas topo-conf) + (catch IllegalArgumentException ex + (throw (InvalidTopologyException. (.getMessage ex))))) + (.validate ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus) + storm-name + topo-conf + topology)) + (swap! (:submitted-count nimbus) inc) + (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs)) + credentials (.get_creds submitOptions) + credentials (when credentials (.get_creds credentials)) + topo-conf (from-json serializedConf) + storm-conf-submitted (normalize-conf + conf + (-> topo-conf + (assoc STORM-ID storm-id) + (assoc TOPOLOGY-NAME storm-name)) + topology) + req (ReqContext/context) + principal (.principal req) + submitter-principal (if principal (.toString principal)) + submitter-user (.toLocal principal-to-local principal) + system-user (System/getProperty "user.name") + topo-acl (distinct (remove nil? (conj (.get storm-conf-submitted TOPOLOGY-USERS) submitter-principal, submitter-user))) + storm-conf (-> storm-conf-submitted + (assoc TOPOLOGY-SUBMITTER-PRINCIPAL (if submitter-principal submitter-principal "")) + (assoc TOPOLOGY-SUBMITTER-USER (if submitter-user submitter-user system-user)) ;Don't let the user set who we launch as + (assoc TOPOLOGY-USERS topo-acl) + (assoc STORM-ZOOKEEPER-SUPERACL (.get conf STORM-ZOOKEEPER-SUPERACL))) + storm-conf (if (Utils/isZkAuthenticationConfiguredStormServer conf) + storm-conf + (dissoc storm-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)) + total-storm-conf (merge conf storm-conf) + topology (normalize-topology total-storm-conf topology) + storm-cluster-state (:storm-cluster-state nimbus)] + (when credentials (doseq [nimbus-autocred-plugin (:nimbus-autocred-plugins nimbus)] + (.populateCredentials nimbus-autocred-plugin credentials (Collections/unmodifiableMap storm-conf)))) + (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? submitter-user) (.isEmpty (.trim submitter-user)))) + (throw (AuthorizationException. "Could not determine the user to run this topology as."))) + (system-topology! total-storm-conf topology) ;; this validates the structure of the topology + (validate-topology-size topo-conf conf topology) + (when (and (Utils/isZkAuthenticationConfiguredStormServer conf) + (not (Utils/isZkAuthenticationConfiguredTopology storm-conf))) + (throw (IllegalArgumentException. "The cluster is configured for zookeeper authentication, but no payload was provided."))) + (log-message "Received topology submission for " + storm-name + " with conf " + (redact-value storm-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)) + ;; lock protects against multiple topologies being submitted at once and + ;; cleanup thread killing topology in b/w assignment and starting the topology + (locking (:submit-lock nimbus) + (check-storm-active! nimbus storm-name false) + ;;cred-update-lock is not needed here because creds are being added for the first time. + (.set-credentials! storm-cluster-state storm-id credentials storm-conf) + (log-message "uploadedJar " uploadedJarLocation) + (setup-storm-code nimbus conf storm-id uploadedJarLocation total-storm-conf topology) + (wait-for-desired-code-replication nimbus total-storm-conf storm-id) + (.setup-heartbeats! storm-cluster-state storm-id) + (if (total-storm-conf TOPOLOGY-BACKPRESSURE-ENABLE) + (.setup-backpressure! storm-cluster-state storm-id)) + (notify-topology-action-listener nimbus storm-name "submitTopology") + (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive + TopologyInitialStatus/ACTIVE :active}] + (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions)))))) + (catch Throwable e + (log-warn-error e "Topology submission exception. (topology name='" storm-name "')") + (throw e)))) + + (^void submitTopology + [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology] + (mark! nimbus:num-submitTopology-calls) + (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology + (SubmitOptions. TopologyInitialStatus/ACTIVE))) + + (^void killTopology [this ^String name] + (mark! nimbus:num-killTopology-calls) + (.killTopologyWithOpts this name (KillOptions.))) + + (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options] + (mark! nimbus:num-killTopologyWithOpts-calls) + (check-storm-active! nimbus storm-name true) + (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus) + storm-id (topology-conf STORM-ID) + operation "killTopology"] + (check-authorization! nimbus storm-name topology-conf operation) + (let [wait-amt (if (.is_set_wait_secs options) + (.get_wait_secs options) + )] + (transition-name! nimbus storm-name [:kill wait-amt] true) + (notify-topology-action-listener nimbus storm-name operation)) + (add-topology-to-history-log (get-storm-id (:storm-cluster-state nimbus) storm-name) + nimbus topology-conf))) + + (^void rebalance [this ^String storm-name ^RebalanceOptions options] + (mark! nimbus:num-rebalance-calls) + (check-storm-active! nimbus storm-name true) + (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus) + operation "rebalance"] + (check-authorization! nimbus storm-name topology-conf operation) + (let [wait-amt (if (.is_set_wait_secs options) + (.get_wait_secs options)) + num-workers (if (.is_set_num_workers options) + (.get_num_workers options)) + executor-overrides (if (.is_set_num_executors options) + (.get_num_executors options) + {})] + (doseq [[c num-executors] executor-overrides] + (when (<= num-executors 0) + (throw (InvalidTopologyException. "Number of executors must be greater than 0")) + )) + (transition-name! nimbus storm-name [:rebalance wait-amt num-workers executor-overrides] true) + + (notify-topology-action-listener nimbus storm-name operation)))) + + (activate [this storm-name] + (mark! nimbus:num-activate-calls) + (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus) + operation "activate"] + (check-authorization! nimbus storm-name topology-conf operation) + (transition-name! nimbus storm-name :activate true) + (notify-topology-action-listener nimbus storm-name operation))) + + (deactivate [this storm-name] + (mark! nimbus:num-deactivate-calls) + (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus) + operation "deactivate"] + (check-authorization! nimbus storm-name topology-conf operation) + (transition-name! nimbus storm-name :inactivate true) + (notify-topology-action-listener nimbus storm-name operation))) + + (debug [this storm-name component-id enable? samplingPct] + (mark! nimbus:num-debug-calls) + (let [storm-cluster-state (:storm-cluster-state nimbus) + storm-id (get-storm-id storm-cluster-state storm-name) + topology-conf (try-read-storm-conf conf storm-id blob-store) + ;; make sure samplingPct is within bounds. + spct (Math/max (Math/min samplingPct 100.0) 0.0) + ;; while disabling we retain the sampling pct. + debug-options (if enable? {:enable enable? :samplingpct spct} {:enable enable?}) + storm-base-updates (assoc {} :component->debug (if (empty? component-id) + {storm-id debug-options} + {component-id debug-options}))] + (check-authorization! nimbus storm-name topology-conf "debug") + (when-not storm-id + (throw (NotAliveException. storm-name))) + (log-message "Nimbus setting debug to " enable? " for storm-name '" storm-name "' storm-id '" storm-id "' sampling pct '" spct "'" + (if (not (clojure.string/blank? component-id)) (str " component-id '" component-id "'"))) + (locking (:submit-lock nimbus) + (.update-storm! storm-cluster-state storm-id storm-base-updates)))) + + (^void setWorkerProfiler + [this ^String id ^ProfileRequest profileRequest] + (mark! nimbus:num-setWorkerProfiler-calls) + (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus)) + storm-name (topology-conf TOPOLOGY-NAME) + _ (check-authorization! nimbus storm-name topology-conf "setWorkerProfiler") + storm-cluster-state (:storm-cluster-state nimbus)] + (.set-worker-profile-request storm-cluster-state id profileRequest))) + + (^List getComponentPendingProfileActions + [this ^String id ^String component_id ^ProfileAction action] + (mark! nimbus:num-getComponentPendingProfileActions-calls) + (let [info (get-common-topo-info id "getComponentPendingProfileActions") + storm-cluster-state (:storm-cluster-state info) + task->component (:task->component info) + {:keys [executor->node+port node->host]} (:assignment info) + executor->host+port (map-val (fn [[node port]] + [(node->host node) port]) + executor->node+port) + nodeinfos (stats/extract-nodeinfos-from-hb-for-comp executor->host+port task->component false component_id) + all-pending-actions-for-topology (.get-topology-profile-requests storm-cluster-state id true) + latest-profile-actions (remove nil? (map (fn [nodeInfo] + (->> all-pending-actions-for-topology + (filter #(and (= (:host nodeInfo) (.get_node (.get_nodeInfo %))) + (= (:port nodeInfo) (first (.get_port (.get_nodeInfo %)))))) + (filter #(= action (.get_action %))) + (sort-by #(.get_time_stamp %) >) + first)) + nodeinfos))] + (log-message "Latest profile actions for topology " id " component " component_id " " (pr-str latest-profile-actions)) + latest-profile-actions)) + + (^void setLogConfig [this ^String id ^LogConfig log-config-msg] + (mark! nimbus:num-setLogConfig-calls) + (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus)) + storm-name (topology-conf TOPOLOGY-NAME) + _ (check-authorization! nimbus storm-name topology-conf "setLogConfig") + storm-cluster-state (:storm-cluster-state nimbus) + merged-log-config (or (.topology-log-config storm-cluster-state id nil) (LogConfig.)) + named-loggers (.get_named_logger_level merged-log-config)] + (doseq [[_ level] named-loggers] + (.set_action level LogLevelAction/UNCHANGED)) + (doseq [[logger-name log-config] (.get_named_logger_level log-config-msg)] + (let [action (.get_action log-config)] + (if (clojure.string/blank? logger-name) + (throw (RuntimeException. "Named loggers need a valid name. Use ROOT for the root logger"))) + (condp = action + LogLevelAction/UPDATE + (do (set-logger-timeouts log-config) + (.put_to_named_logger_level merged-log-config logger-name log-config)) + LogLevelAction/REMOVE + (let [named-loggers (.get_named_logger_level merged-log-config)] + (if (and (not (nil? named-loggers)) + (.containsKey named-loggers logger-name)) + (.remove named-loggers logger-name)))))) + (log-message "Setting log config for " storm-name ":" merged-log-config) + (.set-topology-log-config! storm-cluster-state id merged-log-config))) + + (uploadNewCredentials [this storm-name credentials] + (mark! nimbus:num-uploadNewCredentials-calls) + (let [storm-cluster-state (:storm-cluster-state nimbus) + storm-id (get-storm-id storm-cluster-state storm-name) + topology-conf (try-read-storm-conf conf storm-id blob-store) + creds (when credentials (.get_creds credentials))] + (check-authorization! nimbus storm-name topology-conf "uploadNewCredentials") + (locking (:cred-update-lock nimbus) (.set-credentials! storm-cluster-state storm-id creds topology-conf)))) + + (beginFileUpload [this] + (mark! nimbus:num-beginFileUpload-calls) + (check-authorization! nimbus nil nil "fileUpload") + (let [fileloc (str (inbox nimbus) "/stormjar-" (uuid) ".jar")] + (.put (:uploaders nimbus) + fileloc + (Channels/newChannel (FileOutputStream. fileloc))) + (log-message "Uploading file from client to " fileloc) + fileloc + )) + + (^void uploadChunk [this ^String location ^ByteBuffer chunk] + (mark! nimbus:num-uploadChunk-calls) + (check-authorization! nimbus nil nil "fileUpload") + (let [uploaders (:uploaders nimbus) + ^WritableByteChannel channel (.get uploaders location)] + (when-not channel + (throw (RuntimeException. + "File for that location does not exist (or timed out)"))) + (.write channel chunk) + (.put uploaders location channel) + )) + + (^void finishFileUpload [this ^String location] + (mark! nimbus:num-finishFileUpload-calls) + (check-authorization! nimbus nil nil "fileUpload") + (let [uploaders (:uploaders nimbus) + ^WritableByteChannel channel (.get uploaders location)] + (when-not channel + (throw (RuntimeException. + "File for that location does not exist (or timed out)"))) + (.close channel) + (log-message "Finished uploading file from client: " location) + (.remove uploaders location) + )) + + (^String beginFileDownload + [this ^String file] + (mark! nimbus:num-beginFileDownload-calls) + (check-authorization! nimbus nil nil "fileDownload") + (let [is (BufferInputStream. (.getBlob (:blob-store nimbus) file nil) + ^Integer (Utils/getInt (conf STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES) + (int 65536))) + id (uuid)] + (.put (:downloaders nimbus) id is) + id)) + + (^ByteBuffer downloadChunk [this ^String id] + (mark! nimbus:num-downloadChunk-calls) + (check-authorization! nimbus nil nil "fileDownload") + (let [downloaders (:downloaders nimbus) + ^BufferFileInputStream is (.get downloaders id)] + (when-not is + (throw (RuntimeException. + "Could not find input stream for that id"))) + (let [ret (.read is)] + (.put downloaders id is) + (when (empty? ret) + (.remove downloaders id)) + (ByteBuffer/wrap ret) + ))) + + (^String getNimbusConf [this] + (mark! nimbus:num-getNimbusConf-calls) + (check-authorization! nimbus nil nil "getNimbusConf") + (to-json (:conf nimbus))) + + (^LogConfig getLogConfig [this ^String id] + (mark! nimbus:num-getLogConfig-calls) + (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus)) + storm-name (topology-conf TOPOLOGY-NAME) + _ (check-authorization! nimbus storm-name topology-conf "getLogConfig") + storm-cluster-state (:storm-cluster-state nimbus) + log-config (.topology-log-config storm-cluster-state id nil)] + (if log-config log-config (LogConfig.)))) + + (^String getTopologyConf [this ^String id] + (mark! nimbus:num-getTopologyConf-calls) + (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus)) + storm-name (topology-conf TOPOLOGY-NAME)] + (check-authorization! nimbus storm-name topology-conf "getTopologyConf") + (to-json topology-conf))) + + (^StormTopology getTopology [this ^String id] + (mark! nimbus:num-getTopology-calls) + (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus)) + storm-name (topology-conf TOPOLOGY-NAME)] + (check-authorization! nimbus storm-name topology-conf "getTopology") + (system-topology! topology-conf (try-read-storm-topology id (:blob-store nimbus))))) + + (^StormTopology getUserTopology [this ^String id] + (mark! nimbus:num-getUserTopology-calls) + (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus)) + storm-name (topology-conf TOPOLOGY-NAME)] + (check-authorization! nimbus storm-name topology-conf "getUserTopology") + (try-read-storm-topology id blob-store))) + + (^ClusterSummary getClusterInfo [this] + (mark! nimbus:num-getClusterInfo-calls) + (check-authorization! nimbus nil nil "getClusterInfo") + (let [storm-cluster-state (:storm-cluster-state nimbus) + supervisor-infos (all-supervisor-info storm-cluster-state) + ;; TODO: need to get the port info about supervisors... + ;; in standalone just look at metadata, otherwise just say N/A? + supervisor-summaries (dofor [[id info] supervisor-infos] + (let [ports (set (:meta info)) ;;TODO: this is only true for standalone + sup-sum (SupervisorSummary. (:hostname info) + (:uptime-secs info) + (count ports) + (count (:used-ports info)) + id) ] + (.set_total_resources sup-sum (map-val double (:resources-map info))) + (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)] + (.set_used_mem sup-sum used-mem) + (.set_used_cpu sup-sum used-cpu)) + (when-let [version (:version info)] (.set_version sup-sum version)) + sup-sum)) + nimbus-uptime ((:uptime nimbus)) + bases (topology-bases storm-cluster-state) + nimbuses (.nimbuses storm-cluster-state) + + ;;update the isLeader field for each nimbus summary + _ (let [leader (.getLeader (:leader-elector nimbus)) + leader-host (.getHost leader) + leader-port (.getPort leader)] + (doseq [nimbus-summary nimbuses] + (.set_uptime_secs nimbus-summary (time-delta (.get_uptime_secs nimbus-summary))) + (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary)))))) + + topology-summaries (dofor [[id base] bases :when base] + (let [assignment (.assignment-info storm-cluster-state id nil) + topo-summ (TopologySummary. id + (:storm-name base) + (->> (:executor->node+port assignment) + keys + (mapcat executor-id->tasks) + count) + (->> (:executor->node+port assignment) + keys + count) + (->> (:executor->node+port assignment) + vals + set + count) + (time-delta (:launch-time-secs base)) + (extract-status-str base))] + (when-let [owner (:owner base)] (.set_owner topo-summ owner)) + (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status)) + (when-let [resources (.get @(:id->resources nimbus) id)] + (.set_requested_memonheap topo-summ (get resources 0)) + (.set_requested_memoffheap topo-summ (get resources 1)) + (.set_requested_cpu topo-summ (get resources 2)) + (.set_assigned_memonheap topo-summ (get resources 3)) + (.set_assigned_memoffheap topo-summ (get resources 4)) + (.set_assigned_cpu topo-summ (get resources 5))) + (.set_replication_count topo-summ (get-blob-replication-count (master-stormcode-key id) nimbus)) + topo-summ)) + ret (ClusterSummary. supervisor-summaries + topology-summaries + nimbuses) + _ (.set_nimbus_uptime_secs ret nimbus-uptime)] + ret)) + + (^TopologyInfo getTopologyInfoWithOpts [this ^String storm-id ^GetInfoOptions options] + (mark! nimbus:num-getTopologyInfoWithOpts-calls) + (let [{:keys [storm-name + storm-cluster-state + all-components + launch-time-secs + assignment + beats + task->component + base]} (get-common-topo-info storm-id "getTopologyInfo") + num-err-choice (or (.get_num_err_choice options) + NumErrorsChoice/ALL) + errors-fn (condp = num-err-choice + NumErrorsChoice/NONE (fn [& _] ()) ;; empty list only + NumErrorsChoice/ONE (comp #(remove nil? %) + list + get-last-error) + NumErrorsChoice/ALL get-errors + ;; Default + (do + (log-warn "Got invalid NumErrorsChoice '" + num-err-choice + "'") + get-errors)) + errors (->> all-components + (map (fn [c] [c (errors-fn storm-cluster-state storm-id c)])) + (into {})) + executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)] + (let [host (-> assignment :node->host (get node)) + heartbeat (get beats executor) + stats (:stats heartbeat) + stats (if stats + (stats/thriftify-executor-stats stats))] + (doto + (ExecutorSummary. (thriftify-executor-id executor) + (-> executor first task->component) + host + port + (nil-to-zero (:uptime heartbeat))) + (.set_stats stats)) + )) + topo-info (TopologyInfo. storm-id + storm-name + (time-delta launch-time-secs) + executor-summaries + (extract-status-str base) + errors + )] + (when-let [owner (:owner base)] (.set_owner topo-info owner)) + (when-let [sched-status (.get @(:id->sched-status nimbus) storm-id)] (.set_sched_status topo-info sched-status)) + (when-let [resources (.get @(:id->resources nimbus) storm-id)] + (.set_requested_memonheap topo-info (get resources 0)) + (.set_requested_memoffheap topo-info (get resources 1)) + (.set_requested_cpu topo-info (get resources 2)) + (.set_assigned_memonheap topo-info (get resources 3)) + (.set_assigned_memoffheap topo-info (get resources 4)) + (.set_assigned_cpu topo-info (get resources 5))) + (when-let [component->debug (:component->debug base)] + (.set_component_debug topo-info (map-val converter/thriftify-debugoptions component->debug))) + (.set_replication_count topo-info (get-blob-replication-count (master-stormcode-key storm-id) nimbus)) + topo-info)) + + (^TopologyInfo getTopologyInfo [this ^String topology-id] + (mark! nimbus:num-getTopologyInfo-calls) + (.getTopologyInfoWithOpts this + topology-id + (doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/ALL)))) + + (^String beginCreateBlob [this + ^String blob-key + ^SettableBlobMeta blob-meta] + (let [session-id (uuid)] + (.put (:blob-uploaders nimbus) + session-id + (.createBlob (:blob-store nimbus) blob-key blob-meta (get-subject))) + (log-message "Created blob for " blob-key + " with session id " session-id) + (str session-id))) + + (^String beginUpdateBlob [this ^String blob-key] + (let [^AtomicOutputStream os (.updateBlob (:blob-store nimbus) + blob-key (get-subject))] + (let [session-id (uuid)] + (.put (:blob-uploaders nimbus) session-id os) + (log-message "Created upload session for " blob-key + " with id " session-id) + (str session-id)))) + + (^void createStateInZookeeper [this ^String blob-key] + (let [storm-cluster-state (:storm-cluster-state nimbus) + blob-store (:blob-store nimbus) + nimbus-host-port-info (:nimbus-host-port-info nimbus) + conf (:conf nimbus)] + (if (instance? LocalFsBlobStore blob-store) + (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf))) + (log-debug "Created state in zookeeper" storm-cluster-state blob-store nimbus-host-port-info))) + + (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk] + (let [uploaders (:blob-uploaders nimbus)] + (if-let [^AtomicOutputStream os (.get uploaders session)] + (let [chunk-array (.array blob-chunk) + remaining (.remaining blob-chunk) + array-offset (.arrayOffset blob-chunk) + position (.position blob-chunk)] + (.write os chunk-array (+ array-offset position) remaining) + (.put uploaders session os)) + (throw-runtime "Blob for session " + session + " does not exist (or timed out)")))) + + (^void finishBlobUpload [this ^String session] + (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)] + (do + (.close os) + (log-message "Finished uploading blob for session " + session + ". Closing session.") + (.remove (:blob-uploaders nimbus) session)) + (throw-runtime "Blob for session " + session + " does not exist (or timed out)"))) + + (^void cancelBlobUpload [this ^String session] + (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)] + (do + (.cancel os) + (log-message "Canceled uploading blob for session " + session + ". Closing session.") + (.remove (:blob-uploaders nimbus) session)) + (throw-runtime "Blob for session " + session + " does not exist (or timed out)"))) + + (^ReadableBlobMeta getBlobMeta [this ^String blob-key] + (let [^ReadableBlobMeta ret (.getBlobMeta (:blob-store nimbus) + blob-key (get-subject))] + ret)) + + (^void setBlobMeta [this ^String blob-key ^SettableBlobMeta blob-meta] + (->> (ReqContext/context) + (.subject) + (.setBlobMeta (:blob-store nimbus) blob-key blob-meta))) + + (^BeginDownloadResult beginBlobDownload [this ^String blob-key] + (let [^InputStreamWithMeta is (.getBlob (:blob-store nimbus) + blob-key (get-subject))] + (let [session-id (uuid) + ret (BeginDownloadResult. (.getVersion is) (str session-id))] + (.set_data_size ret (.getFileLength is)) + (.put (:blob-downloaders nimbus) session-id (BufferInputStream. is (Utils/getInt (conf STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES) (int 65536)))) + (log-message "Created download session for " blob-key + " with id " session-id) + ret))) + + (^ByteBuffer downloadBlobChunk [this ^String session] + (let [downloaders (:blob-downloaders nimbus) + ^BufferInputStream is (.get downloaders session)] + (when-not is + (throw (RuntimeException. + "Could not find input stream for session " session))) + (let [ret (.read is)] + (.put downloaders session is) + (when (empty? ret) + (.close is) + (.remove downloaders session)) + (log-debug "Sending " (alength ret) " bytes") + (ByteBuffer/wrap ret)))) + + (^void deleteBlob [this ^String blob-key] + (let [subject (->> (ReqContext/context) + (.subject))] + (.deleteBlob (:blob-store nimbus) blob-key subject) + (when (instance? LocalFsBlobStore blob-store) + (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key) + (.remove-key-version! (:storm-cluster-state nimbus) blob-key)) + (log-message "Deleted blob for key " blob-key))) + + (^ListBlobsResult listBlobs [this ^String session] + (let [listers (:blob-listers nimbus) + ^Iterator keys-it (if (clojure.string/blank? session) + (.listKeys (:blob-store nimbus)) + (.get listers session)) + _ (or keys-it (throw-runtime "Blob list for session " + session + " does not exist (or timed out)")) + + ;; Create a new session id if the user gave an empty session string. + ;; This is the use case when the user wishes to list blobs + ;; starting from the beginning. + session (if (clojure.string/blank? session) + (let [new-session (uuid)] + (log-message "Creating new session for downloading list " new-session) + new-session) + session)] + (if-not (.hasNext keys-it) + (do + (.remove listers session) + (log-message "No more blobs to list for session " session) + ;; A blank result communicates that there are no more blobs. + (ListBlobsResult. (ArrayList. 0) (str session))) + (let [^List list-chunk (->> keys-it + (iterator-seq) + (take 100) ;; Limit to next 100 keys + (ArrayList.))] + (log-message session " downloading " (.size list-chunk) " entries") + (.put listers session keys-it) + (ListBlobsResult. list-chunk (str session)))))) + + (^int getBlobReplication [this ^String blob-key] + (->> (ReqContext/context) + (.subject) + (.getBlobReplication (:blob-store nimbus) blob-key))) + + (^int updateBlobReplication [this ^String blob-key ^int replication] + (->> (ReqContext/context) + (.subject) + (.updateBlobReplication (:blob-store nimbus) blob-key replication))) + + (^TopologyPageInfo getTopologyPageInfo + [this ^String topo-id ^String window ^boolean include-sys?] + (mark! nimbus:num-getTopologyPageInfo-calls) + (let [info (get-common-topo-info topo-id "getTopologyPageInfo") + + exec->node+port (:executor->node+port (:assignment info)) + last-err-fn (partial get-last-error + (:storm-cluster-state info) + topo-id) + topo-page-info (stats/agg-topo-execs-stats topo-id + exec->node+port + (:task->component info) + (:beats info) + (:topology info) + window + include-sys? + last-err-fn)] + (when-let [owner (:owner (:base info))] + (.set_owner topo-page-info owner)) + (when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)] + (.set_sched_status topo-page-info sched-status)) + (when-let [resources (.get @(:id->resources nimbus) topo-id)] + (.set_requested_memonheap topo-page-info (get resources 0)) + (.set_requested_memoffheap topo-page-info (get resources 1)) + (.set_requested_cpu topo-page-info (get resources 2)) + (.set_assigned_memonheap topo-page-info (get resources 3)) + (.set_assigned_memoffheap topo-page-info (get resources 4)) + (.set_assigned_cpu topo-page-info (get resources 5))) + (doto topo-page-info + (.set_name (:storm-name info)) + (.set_status (extract-status-str (:base info))) + (.set_uptime_secs (time-delta (:launch-time-secs info))) + (.set_topology_conf (to-json (try-read-storm-conf conf + topo-id (:blob-store nimbus)))) + (.set_replication_count (get-blob-replication-count (master-stormcode-key topo-id) nimbus))) + (when-let [debug-options + (get-in info [:base :component->debug topo-id])] + (.set_debug_options + topo-page-info + (converter/thriftify-debugoptions debug-options))) + topo-page-info)) + + (^ComponentPageInfo getComponentPageInfo + [this + ^String topo-id + ^String component-id + ^String window + ^boolean include-sys?] + (mark! nimbus:num-getComponentPageInfo-calls) + (let [info (get-common-topo-info topo-id "getComponentPageInfo") + {:keys [executor->node+port node->host]} (:assignment info) + executor->host+port (map-val (fn [[node port]] + [(node->host node) port]) + executor->node+port) + comp-page-info (stats/agg-comp-execs-stats executor->host+port + (:task->component info) + (:beats info) + window + include-sys? + topo-id + (:topology info) + component-id)] + (doto comp-page-info + (.set_topology_name (:storm-name info)) + (.set_errors (get-errors (:storm-cluster-state info) + topo-id + component-id)) + (.set_topology_status (extract-status-str (:base info)))) + (when-let [debug-options + (get-in info [:base :component->debug component-id])] + (.set_debug_options + comp-page-info + (converter/thriftify-debugoptions debug-options))) + ;; Add the event logger details + (let [component->tasks (reverse-map (:task->component info))] + (if (contains? component->tasks EVENTLOGGER-COMPONENT-ID) + (let [eventlogger-tasks (sort (get component->tasks + EVENTLOGGER-COMPONENT-ID)) + ;; Find the task the events from this component route to. + task-index (mod (TupleUtils/listHashCode [component-id]) + (count eventlogger-tasks)) + task-id (nth eventlogger-tasks task-index) + eventlogger-exec (first (filter (fn [[start stop]] + (between? task-id start stop)) + (keys executor->host+port))) + [host port] (get executor->host+port eventlogger-exec)] + (if (and host port) + (doto comp-page-info + (.set_eventlog_host host) + (.set_eventlog_port port)))))) + comp-page-info)) + + (^TopologyHistoryInfo getTopologyHistory [this ^String user] + (let [storm-cluster-state (:storm-cluster-state nimbus) + bases (topology-bases storm-cluster-state) + assigned-topology-ids (.assignments storm-cluster-state nil) + user-group-match-fn (fn [topo-id user conf] + (let [topology-conf (try-read-storm-conf conf topo-id (:blob-store nimbus)) + groups (get-topo-logs-groups topology-conf)] + (or (nil? user) + (some #(= % user) admin-users) + (does-users-group-intersect? user groups conf) + (some #(= % user) (get-topo-logs-users topology-conf))))) + active-ids-for-user (filter #(user-group-match-fn % user (:conf nimbus)) assigned-topology-ids) + topo-history-list (read-topology-history nimbus user admin-users)] + (TopologyHistoryInfo. (distinct (concat active-ids-for-user topo-history-list))))) + + Shutdownable + (shutdown [this] + (mark! nimbus:num-shutdown-calls) + (log-message "Shutting down master") + (cancel-timer (:timer nimbus)) + (.disconnect (:storm-cluster-state nimbus)) + (.cleanup (:downloaders nimbus)) + (.cleanup (:uploaders nimbus)) + (.shutdown (:blob-store nimbus)) + (.close (:leader-elector nimbus)) + (when (:nimbus-topology-action-notifier nimbus) (.cleanup (:nimbus-topology-action-notifier nimbus))) + (log-message "Shut down master")) + DaemonCommon + (waiting? [this] + (timer-waiting? (:timer nimbus)))))) + +(defserverfn service-handler [conf inimbus] + (.prepare inimbus conf (master-inimbus-dir conf)) + (log-message "Starting Nimbus with conf " conf) + (let [nimbus (nimbus-data conf inimbus) + blob-store (:blob-store nimbus)] (.prepare ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus) conf) ;add to nimbuses @@ -1415,6 +2198,9 @@ (.blobstore (:storm-cluster-state nimbus) (fn [] (blob-sync conf nimbus))) (setup-blobstore nimbus)) + (doseq [consumer (:cluster-consumer-executors nimbus)] + (.prepare consumer)) + (when (is-leader nimbus :throw-exception false) (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))] (transition! nimbus storm-id :startup))) @@ -1457,740 +2243,16 @@ (start-metrics-reporters conf) - (reify Nimbus$Iface - (^void submitTopologyWithOpts - [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology - ^SubmitOptions submitOptions] - (try - (mark! nimbus:num-submitTopologyWithOpts-calls) - (is-leader nimbus) - (assert (not-nil? submitOptions)) - (validate-topology-name! storm-name) - (check-authorization! nimbus storm-name nil "submitTopology") - (check-storm-active! nimbus storm-name false) - (let [topo-conf (from-json serializedConf)] - (try - (validate-configs-with-schemas topo-conf) - (catch IllegalArgumentException ex - (throw (InvalidTopologyException. (.getMessage ex))))) - (.validate ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus) - storm-name - topo-conf - topology)) - (swap! (:submitted-count nimbus) inc) - (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs)) - credentials (.get_creds submitOptions) - credentials (when credentials (.get_creds credentials)) - topo-conf (from-json serializedConf) - storm-conf-submitted (normalize-conf - conf - (-> topo-conf - (assoc STORM-ID storm-id) - (assoc TOPOLOGY-NAME storm-name)) - topology) - req (ReqContext/context) - principal (.principal req) - submitter-principal (if principal (.toString principal)) - submitter-user (.toLocal principal-to-local principal) - system-user (System/getProperty "user.name") - topo-acl (distinct (remove nil? (conj (.get storm-conf-submitted TOPOLOGY-USERS) submitter-principal, submitter-user))) - storm-conf (-> storm-conf-submitted - (assoc TOPOLOGY-SUBMITTER-PRINCIPAL (if submitter-principal submitter-principal "")) - (assoc TOPOLOGY-SUBMITTER-USER (if submitter-user submitter-user system-user)) ;Don't let the user set who we launch as - (assoc TOPOLOGY-USERS topo-acl) - (assoc STORM-ZOOKEEPER-SUPERACL (.get conf STORM-ZOOKEEPER-SUPERACL))) - storm-conf (if (Utils/isZkAuthenticationConfiguredStormServer conf) - storm-conf - (dissoc storm-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)) - total-storm-conf (merge conf storm-conf) - topology (normalize-topology total-storm-conf topology) - storm-cluster-state (:storm-cluster-state nimbus)] - (when credentials (doseq [nimbus-autocred-plugin (:nimbus-autocred-plugins nimbus)] - (.populateCredentials nimbus-autocred-plugin credentials (Collections/unmodifiableMap storm-conf)))) - (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? submitter-user) (.isEmpty (.trim submitter-user)))) - (throw (AuthorizationException. "Could not determine the user to run this topology as."))) - (system-topology! total-storm-conf topology) ;; this validates the structure of the topology - (validate-topology-size topo-conf conf topology) - (when (and (Utils/isZkAuthenticationConfiguredStormServer conf) - (not (Utils/isZkAuthenticationConfiguredTopology storm-conf))) - (throw (IllegalArgumentException. "The cluster is configured for zookeeper authentication, but no payload was provided."))) - (log-message "Received topology submission for " - storm-name - " with conf " - (redact-value storm-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)) - ;; lock protects against multiple topologies being submitted at once and - ;; cleanup thread killing topology in b/w assignment and starting the topology - (locking (:submit-lock nimbus) - (check-storm-active! nimbus storm-name false) - ;;cred-update-lock is not needed here because creds are being added for the first time. - (.set-credentials! storm-cluster-state storm-id credentials storm-conf) - (log-message "uploadedJar " uploadedJarLocation) - (setup-storm-code nimbus conf storm-id uploadedJarLocation total-storm-conf topology) - (wait-for-desired-code-replication nimbus total-storm-conf storm-id) - (.setup-heartbeats! storm-cluster-state storm-id) - (if (total-storm-conf TOPOLOGY-BACKPRESSURE-ENABLE) - (.setup-backpressure! storm-cluster-state storm-id)) - (notify-topology-action-listener nimbus storm-name "submitTopology") - (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive - TopologyInitialStatus/ACTIVE :active}] - (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions)))))) - (catch Throwable e - (log-warn-error e "Topology submission exception. (topology name='" storm-name "')") - (throw e)))) - - (^void submitTopology - [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology] - (mark! nimbus:num-submitTopology-calls) - (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology - (SubmitOptions. TopologyInitialStatus/ACTIVE))) - - (^void killTopology [this ^String name] - (mark! nimbus:num-killTopology-calls) - (.killTopologyWithOpts this name (KillOptions.))) - - (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options] - (mark! nimbus:num-killTopologyWithOpts-calls) - (check-storm-active! nimbus storm-name true) - (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus) - storm-id (topology-conf STORM-ID) - operation "killTopology"] - (check-authorization! nimbus storm-name topology-conf operation) - (let [wait-amt (if (.is_set_wait_secs options) - (.get_wait_secs options) - )] - (transition-name! nimbus storm-name [:kill wait-amt] true) - (notify-topology-action-listener nimbus storm-name operation)) - (add-topology-to-history-log (get-storm-id (:storm-cluster-state nimbus) storm-name) - nimbus topology-conf))) - - (^void rebalance [this ^String storm-name ^RebalanceOptions options] - (mark! nimbus:num-rebalance-calls) - (check-storm-active! nimbus storm-name true) - (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus) - operation "rebalance"] - (check-authorization! nimbus storm-name topology-conf operation) - (let [wait-amt (if (.is_set_wait_secs options) - (.get_wait_secs options)) - num-workers (if (.is_set_num_workers options) - (.get_num_workers options)) - executor-overrides (if (.is_set_num_executors options) - (.get_num_executors options) - {})] - (doseq [[c num-executors] executor-overrides] - (when (<= num-executors 0) - (throw (InvalidTopologyException. "Number of executors must be greater than 0")) - )) - (transition-name! nimbus storm-name [:rebalance wait-amt num-workers executor-overrides] true) - - (notify-topology-action-listener nimbus storm-name operation)))) - - (activate [this storm-name] - (mark! nimbus:num-activate-calls) - (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus) - operation "activate"] - (check-authorization! nimbus storm-name topology-conf operation) - (transition-name! nimbus storm-name :activate true) - (notify-topology-action-listener nimbus storm-name operation))) - - (deactivate [this storm-name] - (mark! nimbus:num-deactivate-calls) - (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus) - operation "deactivate"] - (check-authorization! nimbus storm-name topology-conf operation) - (transition-name! nimbus storm-name :inactivate true) - (notify-topology-action-listener nimbus storm-name operation))) - - (debug [this storm-name component-id enable? samplingPct] - (mark! nimbus:num-debug-calls) - (let [storm-cluster-state (:storm-cluster-state nimbus) - storm-id (get-storm-id storm-cluster-state storm-name) - topology-conf (try-read-storm-conf conf storm-id blob-store) - ;; make sure samplingPct is within bounds. - spct (Math/max (Math/min samplingPct 100.0) 0.0) - ;; while disabling we retain the sampling pct. - debug-options (if enable? {:enable enable? :samplingpct spct} {:enable enable?}) - storm-base-updates (assoc {} :component->debug (if (empty? component-id) - {storm-id debug-options} - {component-id debug-options}))] - (check-authorization! nimbus storm-name topology-conf "debug") - (when-not storm-id - (throw (NotAliveException. storm-name))) - (log-message "Nimbus setting debug to " enable? " for storm-name '" storm-name "' storm-id '" storm-id "' sampling pct '" spct "'" - (if (not (clojure.string/blank? component-id)) (str " component-id '" component-id "'"))) - (locking (:submit-lock nimbus) - (.update-storm! storm-cluster-state storm-id storm-base-updates)))) - - (^void setWorkerProfiler - [this ^String id ^ProfileRequest profileRequest] - (mark! nimbus:num-setWorkerProfiler-calls) - (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus)) - storm-name (topology-conf TOPOLOGY-NAME) - _ (check-authorization! nimbus storm-name topology-conf "setWorkerProfiler") - storm-cluster-state (:storm-cluster-state nimbus)] - (.set-worker-profile-request storm-cluster-state id profileRequest))) - - (^List getComponentPendingProfileActions - [this ^String id ^String component_id ^ProfileAction action] - (mark! nimbus:num-getComponentPendingProfileActions-calls) - (let [info (get-common-topo-info id "getComponentPendingProfileActions") - storm-cluster-state (:storm-cluster-state info) - task->component (:task->component info) - {:keys [executor->node+port node->host]} (:assignment info) - executor->host+port (map-val (fn [[node port]] - [(node->host node) port]) - executor->node+port) - nodeinfos (stats/extract-nodeinfos-from-hb-for-comp executor->host+port task->component false component_id) - all-pending-actions-for-topology (.get-topology-profile-requests storm-cluster-state id true) - latest-profile-actions (remove nil? (map (fn [nodeInfo] - (->> all-pending-actions-for-topology - (filter #(and (= (:host nodeInfo) (.get_node (.get_nodeInfo %))) - (= (:port nodeInfo) (first (.get_port (.get_nodeInfo %)))))) - (filter #(= action (.get_action %))) - (sort-by #(.get_time_stamp %) >) - first)) - nodeinfos))] - (log-message "Latest profile actions for topology " id " component " component_id " " (pr-str latest-profile-actions)) - latest-profile-actions)) - - (^void setLogConfig [this ^String id ^LogConfig log-config-msg] - (mark! nimbus:num-setLogConfig-calls) - (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus)) - storm-name (topology-conf TOPOLOGY-NAME) - _ (check-authorization! nimbus storm-name topology-conf "setLogConfig") - storm-cluster-state (:storm-cluster-state nimbus) - merged-log-config (or (.topology-log-config storm-cluster-state id nil) (LogConfig.)) - named-loggers (.get_named_logger_level merged-log-config)] - (doseq [[_ level] named-loggers] - (.set_action level LogLevelAction/UNCHANGED)) - (doseq [[logger-name log-config] (.get_named_logger_level log-config-msg)] - (let [action (.get_action log-config)] - (if (clojure.string/blank? logger-name) - (throw (RuntimeException. "Named loggers need a valid name. Use ROOT for the root logger"))) - (condp = action - LogLevelAction/UPDATE - (do (set-logger-timeouts log-config) - (.put_to_named_logger_level merged-log-config logger-name log-config)) - LogLevelAction/REMOVE - (let [named-loggers (.get_named_logger_level merged-log-config)] - (if (and (not (nil? named-loggers)) - (.containsKey named-loggers logger-name)) - (.remove named-loggers logger-name)))))) - (log-message "Setting log config for " storm-name ":" merged-log-config) - (.set-topology-log-config! storm-cluster-state id merged-log-config))) - - (uploadNewCredentials [this storm-name credentials] - (mark! nimbus:num-uploadNewCredentials-calls) - (let [storm-cluster-state (:storm-cluster-state nimbus) - storm-id (get-storm-id storm-cluster-state storm-name) - topology-conf (try-read-storm-conf conf storm-id blob-store) - creds (when credentials (.get_creds credentials))] - (check-authorization! nimbus storm-name topology-conf "uploadNewCredentials") - (locking (:cred-update-lock nimbus) (.set-credentials! storm-cluster-state storm-id creds topology-conf)))) - - (beginFileUpload [this] - (mark! nimbus:num-beginFileUpload-calls) - (check-authorization! nimbus nil nil "fileUpload") - (let [fileloc (str (inbox nimbus) "/stormjar-" (uuid) ".jar")] - (.put (:uploaders nimbus) - fileloc - (Channels/newChannel (FileOutputStream. fileloc))) - (log-message "Uploading file from client to " fileloc) - fileloc - )) - - (^void uploadChunk [this ^String location ^ByteBuffer chunk] - (mark! nimbus:num-uploadChunk-calls) - (check-authorization! nimbus nil nil "fileUpload") - (let [uploaders (:uploaders nimbus) - ^WritableByteChannel channel (.get uploaders location)] - (when-not channel - (throw (RuntimeException. - "File for that location does not exist (or timed out)"))) - (.write channel chunk) - (.put uploaders location channel) - )) - - (^void finishFileUpload [this ^String location] - (mark! nimbus:num-finishFileUpload-calls) - (check-authorization! nimbus nil nil "fileUpload") - (let [uploaders (:uploaders nimbus) - ^WritableByteChannel channel (.get uploaders location)] - (when-not channel - (throw (RuntimeException. - "File for that location does not exist (or timed out)"))) - (.close channel) - (log-message "Finished uploading file from client: " location) - (.remove uploaders location) - )) - - (^String beginFileDownload - [this ^String file] - (mark! nimbus:num-beginFileDownload-calls) - (check-authorization! nimbus nil nil "fileDownload") - (let [is (BufferInputStream. (.getBlob (:blob-store nimbus) file nil) - ^Integer (Utils/getInt (conf STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES) - (int 65536))) - id (uuid)] - (.put (:downloaders nimbus) id is) - id)) - - (^ByteBuffer downloadChunk [this ^String id] - (mark! nimbus:num-downloadChunk-calls) - (check-authorization! nimbus nil nil "fileDownload") - (let [downloaders (:downloaders nimbus) - ^BufferFileInputStream is (.get downloaders id)] - (when-not is - (throw (RuntimeException. - "Could not find input stream for that id"))) - (let [ret (.read is)] - (.put downloaders id is) - (when (empty? ret) - (.remove downloaders id)) - (ByteBuffer/wrap ret) - ))) - - (^String getNimbusConf [this] - (mark! nimbus:num-getNimbusConf-calls) - (check-authorization! nimbus nil nil "getNimbusConf") - (to-json (:conf nimbus))) - - (^LogConfig getLogConfig [this ^String id] - (mark! nimbus:num-getLogConfig-calls) - (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus)) - storm-name (topology-conf TOPOLOGY-NAME) - _ (check-authorization! nimbus storm-name topology-conf "getLogConfig") - storm-cluster-state (:storm-cluster-state nimbus) - log-config (.topology-log-config storm-cluster-state id nil)] - (if log-config log-config (LogConfig.)))) - - (^String getTopologyConf [this ^String id] - (mark! nimbus:num-getTopologyConf-calls) - (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus)) - storm-name (topology-conf TOPOLOGY-NAME)] - (check-authorization! nimbus storm-name topology-conf "getTopologyConf") - (to-json topology-conf))) - - (^StormTopology getTopology [this ^String id] - (mark! nimbus:num-getTopology-calls) - (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus)) - storm-name (topology-conf TOPOLOGY-NAME)] - (check-authorization! nimbus storm-name topology-conf "getTopology") - (system-topology! topology-conf (try-read-storm-topology id (:blob-store nimbus))))) - - (^StormTopology getUserTopology [this ^String id] - (mark! nimbus:num-getUserTopology-calls) - (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus)) - storm-name (topology-conf TOPOLOGY-NAME)] - (check-authorization! nimbus storm-name topology-conf "getUserTopology") - (try-read-storm-topology id blob-store))) - - (^ClusterSummary getClusterInfo [this] - (mark! nimbus:num-getClusterInfo-calls) - (check-authorization! nimbus nil nil "getClusterInfo") - (let [storm-cluster-state (:storm-cluster-state nimbus) - supervisor-infos (all-supervisor-info storm-cluster-state) - ;; TODO: need to get the port info about supervisors... - ;; in standalone just look at metadata, otherwise just say N/A? - supervisor-summaries (dofor [[id info] supervisor-infos] - (let [ports (set (:meta info)) ;;TODO: this is only true for standalone - sup-sum (SupervisorSummary. (:hostname info) - (:uptime-secs info) - (count ports) - (count (:used-ports info)) - id) ] - (.set_total_resources sup-sum (map-val double (:resources-map info))) - (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)] - (.set_used_mem sup-sum used-mem) - (.set_used_cpu sup-sum used-cpu)) - (when-let [version (:version info)] (.set_version sup-sum version)) - sup-sum)) - nimbus-uptime ((:uptime nimbus)) - bases (topology-bases storm-cluster-state) - nimbuses (.nimbuses storm-cluster-state) - - ;;update the isLeader field for each nimbus summary - _ (let [leader (.getLeader (:leader-elector nimbus)) - leader-host (.getHost leader) - leader-port (.getPort leader)] - (doseq [nimbus-summary nimbuses] - (.set_uptime_secs nimbus-summary (time-delta (.get_uptime_secs nimbus-summary))) - (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary)))))) - - topology-summaries (dofor [[id base] bases :when base] - (let [assignment (.assignment-info storm-cluster-state id nil) - topo-summ (TopologySummary. id - (:storm-name base) - (->> (:executor->node+port assignment) - keys - (mapcat executor-id->tasks) - count) - (->> (:executor->node+port assignment) - keys - count) - (->> (:executor->node+port assignment) - vals - set - count) - (time-delta (:launch-time-secs base)) - (extract-status-str base))] - (when-let [owner (:owner base)] (.set_owner topo-summ owner)) - (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status)) - (when-let [resources (.get @(:id->resources nimbus) id)] - (.set_requested_memonheap topo-summ (get resources 0)) - (.set_requested_memoffheap topo-summ (get resources 1)) - (.set_requested_cpu topo-summ (get resources 2)) - (.set_assigned_memonheap topo-summ (get resources 3)) - (.set_assigned_memoffheap topo-summ (get resources 4)) - (.set_assigned_cpu topo-summ (get resources 5))) - (.set_replication_count topo-summ (get-blob-replication-count (master-stormcode-key id) nimbus)) - topo-summ)) - ret (ClusterSummary. supervisor-summaries - topology-summaries - nimbuses) - _ (.set_nimbus_uptime_secs ret nimbus-uptime)] - ret)) - - (^TopologyInfo getTopologyInfoWithOpts [this ^String storm-id ^GetInfoOptions options] - (mark! nimbus:num-getTopologyInfoWithOpts-calls) - (let [{:keys [storm-name - storm-cluster-state - all-components - launch-time-secs - assignment - beats - task->component - base]} (get-common-topo-info storm-id "getTopologyInfo") - num-err-choice (or (.get_num_err_choice options) - NumErrorsChoice/ALL) - errors-fn (condp = num-err-choice - NumErrorsChoice/NONE (fn [& _] ()) ;; empty list only - NumErrorsChoice/ONE (comp #(remove nil? %) - list - get-last-error) - NumErrorsChoice/ALL get-errors - ;; Default - (do - (log-warn "Got invalid NumErrorsChoice '" - num-err-choice - "'") - get-errors)) - errors (->> all-components - (map (fn [c] [c (errors-fn storm-cluster-state storm-id c)])) - (into {})) - executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)] - (let [host (-> assignment :node->host (get node)) - heartbeat (get beats executor) - stats (:stats heartbeat) - stats (if stats - (stats/thriftify-executor-stats stats))] - (doto - (ExecutorSummary. (thriftify-executor-id executor) - (-> executor first task->component) - host - port - (nil-to-zero (:uptime heartbeat))) - (.set_stats stats)) - )) - topo-info (TopologyInfo. storm-id - storm-name - (time-delta launch-time-secs) - executor-summaries - (extract-status-str base) - errors - )] - (when-let [owner (:owner base)] (.set_owner topo-info owner)) - (when-let [sched-status (.get @(:id->sched-status nimbus) storm-id)] (.set_sched_status topo-info sched-status)) - (when-let [resources (.get @(:id->resources nimbus) storm-id)] - (.set_requested_memonheap topo-info (get resources 0)) - (.set_requested_memoffheap topo-info (get resources 1)) - (.set_requested_cpu topo-info (get resources 2)) - (.set_assigned_memonheap topo-info (get resources 3)) - (.set_assigned_memoffheap topo-info (get resources 4)) - (.set_assigned_cpu topo-info (get resources 5))) - (when-let [component->debug (:component->debug base)] - (.set_component_debug topo-info (map-val converter/thriftify-debugoptions component->debug))) - (.set_replication_count topo-info (get-blob-replication-count (master-stormcode-key storm-id) nimbus)) - topo-info)) - - (^TopologyInfo getTopologyInfo [this ^String topology-id] - (mark! nimbus:num-getTopologyInfo-calls) - (.getTopologyInfoWithOpts this - topology-id - (doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/ALL)))) - - (^String beginCreateBlob [this - ^String blob-key - ^SettableBlobMeta blob-meta] - (let [session-id (uuid)] - (.put (:blob-uploaders nimbus) - session-id - (.createBlob (:blob-store nimbus) blob-key blob-meta (get-subject))) - (log-message "Created blob for " blob-key - " with session id " session-id) - (str session-id))) - - (^String beginUpdateBlob [this ^String blob-key] - (let [^AtomicOutputStream os (.updateBlob (:blob-store nimbus) - blob-key (get-subject))] - (let [session-id (uuid)] - (.put (:blob-uploaders nimbus) session-id os) - (log-message "Created upload session for " blob-key - " with id " session-id) - (str session-id)))) - - (^void createStateInZookeeper [this ^String blob-key] - (let [storm-cluster-state (:storm-cluster-state nimbus) - blob-store (:blob-store nimbus) - nimbus-host-port-info (:nimbus-host-port-info nimbus) - conf (:conf nimbus)] - (if (instance? LocalFsBlobStore blob-store) - (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf))) - (log-debug "Created state in zookeeper" storm-cluster-state blob-store nimbus-host-port-info))) - - (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk] - (let [uploaders (:blob-uploaders nimbus)] - (if-let [^AtomicOutputStream os (.get uploaders session)] - (let [chunk-array (.array blob-chunk) - remaining (.remaining blob-chunk) - array-offset (.arrayOffset blob-chunk) - position (.position blob-chunk)] - (.write os chunk-array (+ array-offset position) remaining) - (.put uploaders session os)) - (throw-runtime "Blob for session " - session - " does not exist (or timed out)")))) - - (^void finishBlobUpload [this ^String session] - (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)] - (do - (.close os) - (log-message "Finished uploading blob for session " - session - ". Closing session.") - (.remove (:blob-uploaders nimbus) session)) - (throw-runtime "Blob for session " - session - " does not exist (or timed out)"))) - - (^void cancelBlobUpload [this ^String session] - (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)] - (do - (.cancel os) - (log-message "Canceled uploading blob for session " - session - ". Closing session.") - (.remove (:blob-uploaders nimbus) session)) - (throw-runtime "Blob for session " - session - " does not exist (or timed out)"))) - - (^ReadableBlobMeta getBlobMeta [this ^String blob-key] - (let [^ReadableBlobMeta ret (.getBlobMeta (:blob-store nimbus) - blob-key (get-subject))] - ret)) - - (^void setBlobMeta [this ^String blob-key ^SettableBlobMeta blob-meta] - (->> (ReqContext/context) - (.subject) - (.setBlobMeta (:blob-store nimbus) blob-key blob-meta))) - - (^BeginDownloadResult beginBlobDownload [this ^String blob-key] - (let [^InputStreamWithMeta is (.getBlob (:blob-store nimbus) - blob-key (get-subject))] - (let [session-id (uuid) - ret (BeginDownloadResult. (.getVersion is) (str session-id))] - (.set_data_size ret (.getFileLength is)) - (.put (:blob-downloaders nimbus) session-id (BufferInputStream. is (Utils/getInt (conf STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES) (int 65536)))) - (log-message "Created download session for " blob-key - " with id " session-id) - ret))) - - (^ByteBuffer downloadBlobChunk [this ^String session] - (let [downloaders (:blob-downloaders nimbus) - ^BufferInputStream is (.get downloaders session)] - (when-not is - (throw (RuntimeException. - "Could not find input stream for session " session))) - (let [ret (.read is)] - (.put downloaders session is) - (when (empty? ret) - (.close is) - (.remove downloaders session)) - (log-debug "Sending " (alength ret) " bytes") - (ByteBuffer/wrap ret)))) - - (^void deleteBlob [this ^String blob-key] - (let [subject (->> (ReqContext/context) - (.subject))] - (.deleteBlob (:blob-store nimbus) blob-key subject) - (when (instance? LocalFsBlobStore blob-store) - (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key) - (.remove-key-version! (:storm-cluster-state nimbus) blob-key)) - (log-message "Deleted blob for key " blob-key))) - - (^ListBlobsResult listBlobs [this ^String session] - (let [listers (:blob-listers nimbus) - ^Iterator keys-it (if (clojure.string/blank? session) - (.listKeys (:blob-store nimbus)) - (.get listers session)) - _ (or keys-it (throw-runtime "Blob list for session " - session - " does not exist (or timed out)")) - - ;; Create a new session id if the user gave an empty session string. - ;; This is the use case when the user wishes to list blobs - ;; starting from the beginning. - session (if (clojure.string/blank? session) - (let [new-session (uuid)] - (log-message "Creating new session for downloading list " new-session) - new-session) - session)] - (if-not (.hasNext keys-it) - (do - (.remove listers session) - (log-message "No more blobs to list for session " session) - ;; A blank result communicates that there are no more blobs. - (ListBlobsResult. (ArrayList. 0) (str session))) - (let [^List list-chunk (->> keys-it - (iterator-seq) - (take 100) ;; Limit to next 100 keys - (ArrayList.))] - (log-message session " downloading " (.size list-chunk) " entries") - (.put listers session keys-it) - (ListBlobsResult. list-chunk (str session)))))) - - (^int getBlobReplication [this ^String blob-key] - (->> (ReqContext/context) - (.subject) - (.getBlobReplication (:blob-store nimbus) blob-key))) - - (^int updateBlobReplication [this ^String blob-key ^int replication] - (->> (ReqContext/context) - (.subject) - (.updateBlobReplication (:blob-store nimbus) blob-key replication))) - - (^TopologyPageInfo getTopologyPageInfo - [this ^String topo-id ^String window ^boolean include-sys?] - (mark! nimbus:num-getTopologyPageInfo-calls) - (let [info (get-common-topo-info topo-id "getTopologyPageInfo") - - exec->node+port (:executor->node+port (:assignment info)) - last-err-fn (partial get-last-error - (:storm-cluster-state info) - topo-id) - topo-page-info (stats/agg-topo-execs-stats topo-id - exec->node+port - (:task->component info) - (:beats info) - (:topology info) - window - include-sys? - last-err-fn)] - (when-let [owner (:owner (:base info))] - (.set_owner topo-page-info owner)) - (when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)] - (.set_sched_status topo-page-info sched-status)) - (when-let [resources (.get @(:id->resources nimbus) topo-id)] - (.set_requested_memonheap topo-page-info (get resources 0)) - (.set_requested_memoffheap topo-page-info (get resources 1)) - (.set_requested_cpu topo-page-info (get resources 2)) - (.set_assigned_memonheap topo-page-info (get resources 3)) - (.set_assigned_memoffheap topo-page-info (get resources 4)) - (.set_assigned_cpu topo-page-info (get resources 5))) - (doto topo-page-info - (.set_name (:storm-name info)) - (.set_status (extract-status-str (:base info))) - (.set_uptime_secs (time-delta (:launch-time-secs info))) - (.set_topology_conf (to-json (try-read-storm-conf conf - topo-id (:blob-store nimbus)))) - (.set_replication_count (get-blob-replication-count (master-stormcode-key topo-id) nimbus))) - (when-let [debug-options - (get-in info [:base :component->debug topo-id])] - (.set_debug_options - topo-page-info - (converter/thriftify-debugoptions debug-options))) - topo-page-info)) - - (^ComponentPageInfo getComponentPageInfo - [this - ^String topo-id - ^String component-id - ^String window - ^boolean include-sys?] - (mark! nimbus:num-getComponentPageInfo-calls) - (let [info (get-common-topo-info topo-id "getComponentPageInfo") - {:keys [executor->node+port node->host]} (:assignment info) - executor->host+port (map-val (fn [[node port]] - [(node->host node) port]) - executor->node+port) - comp-page-info (stats/agg-comp-execs-stats executor->host+port - (:task->component info) - (:beats info) - window - include-sys? - topo-id - (:topology info) - component-id)] - (doto comp-page-info - (.set_topology_name (:storm-name info)) - (.set_errors (get-errors (:storm-cluster-state info) - topo-id - component-id)) - (.set_topology_status (extract-status-str (:base info)))) - (when-let [debug-options - (get-in info [:base :component->debug component-id])] - (.set_debug_options - comp-page-info - (converter/thriftify-debugoptions debug-options))) - ;; Add the event logger details - (let [component->tasks (reverse-map (:task->component info))] - (if (contains? component->tasks EVENTLOGGER-COMPONENT-ID) - (let [eventlogger-tasks (sort (get component->tasks - EVENTLOGGER-COMPONENT-ID)) - ;; Find the task the events from this component route to. - task-index (mod (TupleUtils/listHashCode [component-id]) - (count eventlogger-tasks)) - task-id (nth eventlogger-tasks task-index) - eventlogger-exec (first (filter (fn [[start stop]] -
<TRUNCATED>
