http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/ui/core.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj deleted file mode 100644 index 61ddfa9..0000000 --- a/storm-core/src/clj/backtype/storm/ui/core.clj +++ /dev/null @@ -1,1273 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns backtype.storm.ui.core - (:use compojure.core) - (:use [clojure.java.shell :only [sh]]) - (:use ring.middleware.reload - ring.middleware.multipart-params) - (:use [ring.middleware.json :only [wrap-json-params]]) - (:use [hiccup core page-helpers]) - (:use [backtype.storm config util log stats zookeeper converter]) - (:use [backtype.storm.ui helpers]) - (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID - ACKER-FAIL-STREAM-ID mk-authorization-handler - start-metrics-reporters]]]) - (:import [backtype.storm.utils Utils] - [backtype.storm.generated NimbusSummary]) - (:use [clojure.string :only [blank? lower-case trim split]]) - (:import [backtype.storm.generated ExecutorSpecificStats - ExecutorStats ExecutorSummary ExecutorInfo TopologyInfo SpoutStats BoltStats - ErrorInfo ClusterSummary SupervisorSummary TopologySummary - Nimbus$Client StormTopology GlobalStreamId RebalanceOptions - KillOptions GetInfoOptions NumErrorsChoice DebugOptions TopologyPageInfo - TopologyStats CommonAggregateStats ComponentAggregateStats - ComponentType BoltAggregateStats SpoutAggregateStats - ExecutorAggregateStats SpecificAggregateStats ComponentPageInfo - LogConfig LogLevel LogLevelAction]) - (:import [backtype.storm.security.auth AuthUtils ReqContext]) - (:import [backtype.storm.generated AuthorizationException ProfileRequest ProfileAction NodeInfo]) - (:import [backtype.storm.security.auth AuthUtils]) - (:import [backtype.storm.utils VersionInfo]) - (:import [backtype.storm Config]) - (:import [java.io File]) - (:require [compojure.route :as route] - [compojure.handler :as handler] - [ring.util.response :as resp] - [backtype.storm [thrift :as thrift]]) - (:require [metrics.meters :refer [defmeter mark!]]) - (:import [org.apache.commons.lang StringEscapeUtils]) - (:import [org.apache.logging.log4j Level]) - (:gen-class)) - -(def ^:dynamic *STORM-CONF* (read-storm-config)) -(def ^:dynamic *UI-ACL-HANDLER* (mk-authorization-handler (*STORM-CONF* NIMBUS-AUTHORIZER) *STORM-CONF*)) -(def ^:dynamic *UI-IMPERSONATION-HANDLER* (mk-authorization-handler (*STORM-CONF* NIMBUS-IMPERSONATION-AUTHORIZER) *STORM-CONF*)) -(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*)) -(def STORM-VERSION (VersionInfo/getVersion)) - -(defmeter ui:num-cluster-configuration-http-requests) -(defmeter ui:num-cluster-summary-http-requests) -(defmeter ui:num-nimbus-summary-http-requests) -(defmeter ui:num-supervisor-summary-http-requests) -(defmeter ui:num-all-topologies-summary-http-requests) -(defmeter ui:num-topology-page-http-requests) -(defmeter ui:num-build-visualization-http-requests) -(defmeter ui:num-mk-visualization-data-http-requests) -(defmeter ui:num-component-page-http-requests) -(defmeter ui:num-log-config-http-requests) -(defmeter ui:num-activate-topology-http-requests) -(defmeter ui:num-deactivate-topology-http-requests) -(defmeter ui:num-debug-topology-http-requests) -(defmeter ui:num-component-op-response-http-requests) -(defmeter ui:num-topology-op-response-http-requests) -(defmeter ui:num-topology-op-response-http-requests) -(defmeter ui:num-topology-op-response-http-requests) -(defmeter ui:num-main-page-http-requests) - -(defn assert-authorized-user - ([op] - (assert-authorized-user op nil)) - ([op topology-conf] - (let [context (ReqContext/context)] - (if (.isImpersonating context) - (if *UI-IMPERSONATION-HANDLER* - (if-not (.permit *UI-IMPERSONATION-HANDLER* context op topology-conf) - (let [principal (.principal context) - real-principal (.realPrincipal context) - user (if principal (.getName principal) "unknown") - real-user (if real-principal (.getName real-principal) "unknown") - remote-address (.remoteAddress context)] - (throw (AuthorizationException. - (str "user '" real-user "' is not authorized to impersonate user '" user "' from host '" remote-address "'. Please - see SECURITY.MD to learn how to configure impersonation ACL."))))) - (log-warn " principal " (.realPrincipal context) " is trying to impersonate " (.principal context) " but " - NIMBUS-IMPERSONATION-AUTHORIZER " has no authorizer configured. This is a potential security hole. - Please see SECURITY.MD to learn how to configure an impersonation authorizer."))) - - (if *UI-ACL-HANDLER* - (if-not (.permit *UI-ACL-HANDLER* context op topology-conf) - (let [principal (.principal context) - user (if principal (.getName principal) "unknown")] - (throw (AuthorizationException. - (str "UI request '" op "' for '" user "' user is not authorized"))))))))) - - -(defn assert-authorized-profiler-action - [op] - (if-not (*STORM-CONF* WORKER-PROFILER-ENABLED) - (throw (AuthorizationException. - (str "UI request for profiler action '" op "' is disabled."))))) - - -(defn executor-summary-type - [topology ^ExecutorSummary s] - (component-type topology (.get_component_id s))) - -(defn is-ack-stream - [stream] - (let [acker-streams - [ACKER-INIT-STREAM-ID - ACKER-ACK-STREAM-ID - ACKER-FAIL-STREAM-ID]] - (every? #(not= %1 stream) acker-streams))) - -(defn spout-summary? - [topology s] - (= :spout (executor-summary-type topology s))) - -(defn bolt-summary? - [topology s] - (= :bolt (executor-summary-type topology s))) - -(defn group-by-comp - [summs] - (let [ret (group-by #(.get_component_id ^ExecutorSummary %) summs)] - (into (sorted-map) ret ))) - -(defn logviewer-link [host fname secure?] - (if (and secure? (*STORM-CONF* LOGVIEWER-HTTPS-PORT)) - (url-format "https://%s:%s/log?file=%s" - host - (*STORM-CONF* LOGVIEWER-HTTPS-PORT) - fname) - (url-format "http://%s:%s/log?file=%s" - host - (*STORM-CONF* LOGVIEWER-PORT) - fname))) - -(defn event-log-link - [topology-id component-id host port secure?] - (logviewer-link host (event-logs-filename topology-id port) secure?)) - -(defn worker-log-link [host port topology-id secure?] - (let [fname (logs-filename topology-id port)] - (logviewer-link host fname secure?))) - -(defn nimbus-log-link [host port] - (url-format "http://%s:%s/daemonlog?file=nimbus.log" host (*STORM-CONF* LOGVIEWER-PORT) port)) - -(defn get-error-time - [error] - (if error - (time-delta (.get_error_time_secs ^ErrorInfo error)))) - -(defn get-error-data - [error] - (if error - (error-subset (.get_error ^ErrorInfo error)) - "")) - -(defn get-error-port - [error] - (if error - (.get_port ^ErrorInfo error) - "")) - -(defn get-error-host - [error] - (if error - (.get_host ^ErrorInfo error) - "")) - -(defn get-error-time - [error] - (if error - (.get_error_time_secs ^ErrorInfo error) - "")) - -(defn worker-dump-link [host port topology-id] - (url-format "http://%s:%s/dumps/%s/%s" - (url-encode host) - (*STORM-CONF* LOGVIEWER-PORT) - (url-encode topology-id) - (str (url-encode host) ":" (url-encode port)))) - -(defn stats-times - [stats-map] - (sort-by #(Integer/parseInt %) - (-> stats-map - clojurify-structure - (dissoc ":all-time") - keys))) - -(defn window-hint - [window] - (if (= window ":all-time") - "All time" - (pretty-uptime-sec window))) - -(defn sanitize-stream-name - [name] - (let [sym-regex #"(?![A-Za-z_\-:\.])."] - (str - (if (re-find #"^[A-Za-z]" name) - (clojure.string/replace name sym-regex "_") - (clojure.string/replace (str \s name) sym-regex "_")) - (hash name)))) - -(defn sanitize-transferred - [transferred] - (into {} - (for [[time, stream-map] transferred] - [time, (into {} - (for [[stream, trans] stream-map] - [(sanitize-stream-name stream), trans]))]))) - -(defn visualization-data - [spout-bolt spout-comp-summs bolt-comp-summs window storm-id] - (let [components (for [[id spec] spout-bolt] - [id - (let [inputs (.get_inputs (.get_common spec)) - bolt-summs (get bolt-comp-summs id) - spout-summs (get spout-comp-summs id) - bolt-cap (if bolt-summs - (compute-bolt-capacity bolt-summs) - 0)] - {:type (if bolt-summs "bolt" "spout") - :capacity bolt-cap - :latency (if bolt-summs - (get-in - (bolt-streams-stats bolt-summs true) - [:process-latencies window]) - (get-in - (spout-streams-stats spout-summs true) - [:complete-latencies window])) - :transferred (or - (get-in - (spout-streams-stats spout-summs true) - [:transferred window]) - (get-in - (bolt-streams-stats bolt-summs true) - [:transferred window])) - :stats (let [mapfn (fn [dat] - (map (fn [^ExecutorSummary summ] - {:host (.get_host summ) - :port (.get_port summ) - :uptime_secs (.get_uptime_secs summ) - :transferred (if-let [stats (.get_stats summ)] - (sanitize-transferred (.get_transferred stats)))}) - dat))] - (if bolt-summs - (mapfn bolt-summs) - (mapfn spout-summs))) - :link (url-format "/component.html?id=%s&topology_id=%s" id storm-id) - :inputs (for [[global-stream-id group] inputs] - {:component (.get_componentId global-stream-id) - :stream (.get_streamId global-stream-id) - :sani-stream (sanitize-stream-name (.get_streamId global-stream-id)) - :grouping (clojure.core/name (thrift/grouping-type group))})})])] - (into {} (doall components)))) - -(defn stream-boxes [datmap] - (let [filter-fn (mk-include-sys-fn true) - streams - (vec (doall (distinct - (apply concat - (for [[k v] datmap] - (for [m (get v :inputs)] - {:stream (get m :stream) - :sani-stream (get m :sani-stream) - :checked (is-ack-stream (get m :stream))}))))))] - (map (fn [row] - {:row row}) (partition 4 4 nil streams)))) - -(defn- get-topology-info - ([^Nimbus$Client nimbus id] - (.getTopologyInfo nimbus id)) - ([^Nimbus$Client nimbus id options] - (.getTopologyInfoWithOpts nimbus id options))) - -(defn mk-visualization-data - [id window include-sys?] - (thrift/with-configured-nimbus-connection - nimbus - (let [window (if window window ":all-time") - topology (.getTopology ^Nimbus$Client nimbus id) - spouts (.get_spouts topology) - bolts (.get_bolts topology) - summ (->> (doto - (GetInfoOptions.) - (.set_num_err_choice NumErrorsChoice/NONE)) - (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id)) - execs (.get_executors summ) - spout-summs (filter (partial spout-summary? topology) execs) - bolt-summs (filter (partial bolt-summary? topology) execs) - spout-comp-summs (group-by-comp spout-summs) - bolt-comp-summs (group-by-comp bolt-summs) - bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?) - bolt-comp-summs)] - (visualization-data - (merge (hashmap-to-persistent spouts) - (hashmap-to-persistent bolts)) - spout-comp-summs bolt-comp-summs window id)))) - -(defn validate-tplg-submit-params [params] - (let [tplg-jar-file (params :topologyJar) - tplg-config (if (not-nil? (params :topologyConfig)) (from-json (params :topologyConfig)))] - (cond - (nil? tplg-jar-file) {:valid false :error "missing topology jar file"} - (nil? tplg-config) {:valid false :error "missing topology config"} - (nil? (tplg-config "topologyMainClass")) {:valid false :error "topologyMainClass missing in topologyConfig"} - :else {:valid true}))) - -(defn run-tplg-submit-cmd [tplg-jar-file tplg-config user] - (let [tplg-main-class (if (not-nil? tplg-config) (trim (tplg-config "topologyMainClass"))) - tplg-main-class-args (if (not-nil? tplg-config) (tplg-config "topologyMainClassArgs")) - storm-home (System/getProperty "storm.home") - storm-conf-dir (str storm-home file-path-separator "conf") - storm-log-dir (if (not-nil? (*STORM-CONF* "storm.log.dir")) (*STORM-CONF* "storm.log.dir") - (str storm-home file-path-separator "logs")) - storm-libs (str storm-home file-path-separator "lib" file-path-separator "*") - java-cmd (str (System/getProperty "java.home") file-path-separator "bin" file-path-separator "java") - storm-cmd (str storm-home file-path-separator "bin" file-path-separator "storm") - tplg-cmd-response (apply sh - (flatten - [storm-cmd "jar" tplg-jar-file tplg-main-class - (if (not-nil? tplg-main-class-args) tplg-main-class-args []) - (if (not= user "unknown") (str "-c storm.doAsUser=" user) [])]))] - (log-message "tplg-cmd-response " tplg-cmd-response) - (cond - (= (tplg-cmd-response :exit) 0) {"status" "success"} - (and (not= (tplg-cmd-response :exit) 0) - (not-nil? (re-find #"already exists on cluster" (tplg-cmd-response :err)))) {"status" "failed" "error" "Topology with the same name exists in cluster"} - (not= (tplg-cmd-response :exit) 0) {"status" "failed" "error" (clojure.string/trim-newline (tplg-cmd-response :err))} - :else {"status" "success" "response" "topology deployed"} - ))) - -(defn cluster-configuration [] - (thrift/with-configured-nimbus-connection nimbus - (.getNimbusConf ^Nimbus$Client nimbus))) - -(defn topology-history-info - ([user] - (thrift/with-configured-nimbus-connection nimbus - (topology-history-info (.getTopologyHistory ^Nimbus$Client nimbus user) user))) - ([history user] - {"topo-history" - (into [] (.get_topo_ids history))})) - -(defn cluster-summary - ([user] - (thrift/with-configured-nimbus-connection nimbus - (cluster-summary (.getClusterInfo ^Nimbus$Client nimbus) user))) - ([^ClusterSummary summ user] - (let [sups (.get_supervisors summ) - used-slots (reduce + (map #(.get_num_used_workers ^SupervisorSummary %) sups)) - total-slots (reduce + (map #(.get_num_workers ^SupervisorSummary %) sups)) - free-slots (- total-slots used-slots) - topologies (.get_topologies_size summ) - total-tasks (->> (.get_topologies summ) - (map #(.get_num_tasks ^TopologySummary %)) - (reduce +)) - total-executors (->> (.get_topologies summ) - (map #(.get_num_executors ^TopologySummary %)) - (reduce +))] - {"user" user - "stormVersion" STORM-VERSION - "supervisors" (count sups) - "topologies" topologies - "slotsTotal" total-slots - "slotsUsed" used-slots - "slotsFree" free-slots - "executorsTotal" total-executors - "tasksTotal" total-tasks }))) - -(defn convert-to-nimbus-summary[nimbus-seed] - (let [[host port] (.split nimbus-seed ":")] - { - "host" host - "port" port - "nimbusLogLink" (nimbus-log-link host port) - "status" "Offline" - "version" "Not applicable" - "nimbusUpTime" "Not applicable" - "nimbusUptimeSeconds" "Not applicable"} - )) - -(defn nimbus-summary - ([] - (thrift/with-configured-nimbus-connection nimbus - (nimbus-summary - (.get_nimbuses (.getClusterInfo ^Nimbus$Client nimbus))))) - ([nimbuses] - (let [nimbus-seeds (set (map #(str %1 ":" (*STORM-CONF* NIMBUS-THRIFT-PORT)) (set (*STORM-CONF* NIMBUS-SEEDS)))) - alive-nimbuses (set (map #(str (.get_host %1) ":" (.get_port %1)) nimbuses)) - offline-nimbuses (clojure.set/difference nimbus-seeds alive-nimbuses) - offline-nimbuses-summary (map #(convert-to-nimbus-summary %1) offline-nimbuses)] - {"nimbuses" - (concat offline-nimbuses-summary - (for [^NimbusSummary n nimbuses - :let [uptime (.get_uptime_secs n)]] - { - "host" (.get_host n) - "port" (.get_port n) - "nimbusLogLink" (nimbus-log-link (.get_host n) (.get_port n)) - "status" (if (.is_isLeader n) "Leader" "Not a Leader") - "version" (.get_version n) - "nimbusUpTime" (pretty-uptime-sec uptime) - "nimbusUpTimeSeconds" uptime}))}))) - -(defn supervisor-summary - ([] - (thrift/with-configured-nimbus-connection nimbus - (supervisor-summary - (.get_supervisors (.getClusterInfo ^Nimbus$Client nimbus))))) - ([summs] - {"supervisors" - (for [^SupervisorSummary s summs] - {"id" (.get_supervisor_id s) - "host" (.get_host s) - "uptime" (pretty-uptime-sec (.get_uptime_secs s)) - "uptimeSeconds" (.get_uptime_secs s) - "slotsTotal" (.get_num_workers s) - "slotsUsed" (.get_num_used_workers s) - "totalMem" (get (.get_total_resources s) Config/SUPERVISOR_MEMORY_CAPACITY_MB) - "totalCpu" (get (.get_total_resources s) Config/SUPERVISOR_CPU_CAPACITY) - "usedMem" (.get_used_mem s) - "usedCpu" (.get_used_cpu s) - "version" (.get_version s)}) - "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)})) - -(defn all-topologies-summary - ([] - (thrift/with-configured-nimbus-connection - nimbus - (all-topologies-summary - (.get_topologies (.getClusterInfo ^Nimbus$Client nimbus))))) - ([summs] - {"topologies" - (for [^TopologySummary t summs] - { - "id" (.get_id t) - "encodedId" (url-encode (.get_id t)) - "owner" (.get_owner t) - "name" (.get_name t) - "status" (.get_status t) - "uptime" (pretty-uptime-sec (.get_uptime_secs t)) - "uptimeSeconds" (.get_uptime_secs t) - "tasksTotal" (.get_num_tasks t) - "workersTotal" (.get_num_workers t) - "executorsTotal" (.get_num_executors t) - "replicationCount" (.get_replication_count t) - "schedulerInfo" (.get_sched_status t) - "requestedMemOnHeap" (.get_requested_memonheap t) - "requestedMemOffHeap" (.get_requested_memoffheap t) - "requestedTotalMem" (+ (.get_requested_memonheap t) (.get_requested_memoffheap t)) - "requestedCpu" (.get_requested_cpu t) - "assignedMemOnHeap" (.get_assigned_memonheap t) - "assignedMemOffHeap" (.get_assigned_memoffheap t) - "assignedTotalMem" (+ (.get_assigned_memonheap t) (.get_assigned_memoffheap t)) - "assignedCpu" (.get_assigned_cpu t)}) - "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)})) - -(defn topology-stats [window stats] - (let [times (stats-times (:emitted stats)) - display-map (into {} (for [t times] [t pretty-uptime-sec])) - display-map (assoc display-map ":all-time" (fn [_] "All time"))] - (for [w (concat times [":all-time"]) - :let [disp ((display-map w) w)]] - {"windowPretty" disp - "window" w - "emitted" (get-in stats [:emitted w]) - "transferred" (get-in stats [:transferred w]) - "completeLatency" (float-str (get-in stats [:complete-latencies w])) - "acked" (get-in stats [:acked w]) - "failed" (get-in stats [:failed w])}))) - -(defn build-visualization [id window include-sys?] - (thrift/with-configured-nimbus-connection nimbus - (let [window (if window window ":all-time") - topology-info (->> (doto - (GetInfoOptions.) - (.set_num_err_choice NumErrorsChoice/ONE)) - (.getTopologyInfoWithOpts ^Nimbus$Client nimbus - id)) - storm-topology (.getTopology ^Nimbus$Client nimbus id) - spout-executor-summaries (filter (partial spout-summary? storm-topology) (.get_executors topology-info)) - bolt-executor-summaries (filter (partial bolt-summary? storm-topology) (.get_executors topology-info)) - spout-comp-id->executor-summaries (group-by-comp spout-executor-summaries) - bolt-comp-id->executor-summaries (group-by-comp bolt-executor-summaries) - bolt-comp-id->executor-summaries (filter-key (mk-include-sys-fn include-sys?) bolt-comp-id->executor-summaries) - id->spout-spec (.get_spouts storm-topology) - id->bolt (.get_bolts storm-topology) - visualizer-data (visualization-data (merge (hashmap-to-persistent id->spout-spec) - (hashmap-to-persistent id->bolt)) - spout-comp-id->executor-summaries - bolt-comp-id->executor-summaries - window - id)] - {"visualizationTable" (stream-boxes visualizer-data)}))) - -(defn- get-error-json - [topo-id error-info secure?] - (let [host (get-error-host error-info) - port (get-error-port error-info)] - {"lastError" (get-error-data error-info) - "errorTime" (get-error-time error-info) - "errorHost" host - "errorPort" port - "errorLapsedSecs" (get-error-time error-info) - "errorWorkerLogLink" (worker-log-link host port topo-id secure?)})) - -(defn- common-agg-stats-json - "Returns a JSON representation of a common aggregated statistics." - [^CommonAggregateStats common-stats] - {"executors" (.get_num_executors common-stats) - "tasks" (.get_num_tasks common-stats) - "emitted" (.get_emitted common-stats) - "transferred" (.get_transferred common-stats) - "acked" (.get_acked common-stats) - "failed" (.get_failed common-stats)}) - -(defmulti comp-agg-stats-json - "Returns a JSON representation of aggregated statistics." - (fn [_ _ [id ^ComponentAggregateStats s]] (.get_type s))) - -(defmethod comp-agg-stats-json ComponentType/SPOUT - [topo-id secure? [id ^ComponentAggregateStats s]] - (let [^SpoutAggregateStats ss (.. s get_specific_stats get_spout) - cs (.get_common_stats s)] - (merge - (common-agg-stats-json cs) - (get-error-json topo-id (.get_last_error s) secure?) - {"spoutId" id - "encodedSpoutId" (url-encode id) - "completeLatency" (float-str (.get_complete_latency_ms ss))}))) - -(defmethod comp-agg-stats-json ComponentType/BOLT - [topo-id secure? [id ^ComponentAggregateStats s]] - (let [^BoltAggregateStats ss (.. s get_specific_stats get_bolt) - cs (.get_common_stats s)] - (merge - (common-agg-stats-json cs) - (get-error-json topo-id (.get_last_error s) secure?) - {"boltId" id - "encodedBoltId" (url-encode id) - "capacity" (float-str (.get_capacity ss)) - "executeLatency" (float-str (.get_execute_latency_ms ss)) - "executed" (.get_executed ss) - "processLatency" (float-str (.get_process_latency_ms ss))}))) - -(defn- unpack-topology-page-info - "Unpacks the serialized object to data structures" - [^TopologyPageInfo topo-info window secure?] - (let [id (.get_id topo-info) - ^TopologyStats topo-stats (.get_topology_stats topo-info) - stat->window->number - {:emitted (.get_window_to_emitted topo-stats) - :transferred (.get_window_to_transferred topo-stats) - :complete-latencies (.get_window_to_complete_latencies_ms topo-stats) - :acked (.get_window_to_acked topo-stats) - :failed (.get_window_to_failed topo-stats)} - topo-stats (topology-stats window stat->window->number) - [debugEnabled - samplingPct] (if-let [debug-opts (.get_debug_options topo-info)] - [(.is_enable debug-opts) - (.get_samplingpct debug-opts)]) - uptime (.get_uptime_secs topo-info)] - {"id" id - "encodedId" (url-encode id) - "owner" (.get_owner topo-info) - "name" (.get_name topo-info) - "status" (.get_status topo-info) - "uptime" (pretty-uptime-sec uptime) - "uptimeSeconds" uptime - "tasksTotal" (.get_num_tasks topo-info) - "workersTotal" (.get_num_workers topo-info) - "executorsTotal" (.get_num_executors topo-info) - "schedulerInfo" (.get_sched_status topo-info) - "requestedMemOnHeap" (.get_requested_memonheap topo-info) - "requestedMemOffHeap" (.get_requested_memoffheap topo-info) - "requestedCpu" (.get_requested_cpu topo-info) - "assignedMemOnHeap" (.get_assigned_memonheap topo-info) - "assignedMemOffHeap" (.get_assigned_memoffheap topo-info) - "assignedTotalMem" (+ (.get_assigned_memonheap topo-info) (.get_assigned_memoffheap topo-info)) - "assignedCpu" (.get_assigned_cpu topo-info) - "topologyStats" topo-stats - "spouts" (map (partial comp-agg-stats-json id secure?) - (.get_id_to_spout_agg_stats topo-info)) - "bolts" (map (partial comp-agg-stats-json id secure?) - (.get_id_to_bolt_agg_stats topo-info)) - "configuration" (.get_topology_conf topo-info) - "debug" (or debugEnabled false) - "samplingPct" (or samplingPct 10) - "replicationCount" (.get_replication_count topo-info)})) - -(defn exec-host-port - [executors] - (for [^ExecutorSummary e executors] - {"host" (.get_host e) - "port" (.get_port e)})) - -(defn worker-host-port - "Get the set of all worker host/ports" - [id] - (thrift/with-configured-nimbus-connection nimbus - (distinct (exec-host-port (.get_executors (get-topology-info nimbus id)))))) - -(defn topology-page [id window include-sys? user secure?] - (thrift/with-configured-nimbus-connection nimbus - (let [window (if window window ":all-time") - window-hint (window-hint window) - topo-page-info (.getTopologyPageInfo ^Nimbus$Client nimbus - id - window - include-sys?) - topology-conf (from-json (.get_topology_conf topo-page-info)) - msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)] - (merge - (unpack-topology-page-info topo-page-info window secure?) - {"user" user - "window" window - "windowHint" window-hint - "msgTimeout" msg-timeout - "configuration" topology-conf - "visualizationTable" [] - "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)})))) - -(defn component-errors - [errors-list topology-id secure?] - (let [errors (->> errors-list - (sort-by #(.get_error_time_secs ^ErrorInfo %)) - reverse)] - {"componentErrors" - (for [^ErrorInfo e errors] - {"time" (* 1000 (long (.get_error_time_secs e))) - "errorHost" (.get_host e) - "errorPort" (.get_port e) - "errorWorkerLogLink" (worker-log-link (.get_host e) - (.get_port e) - topology-id - secure?) - "errorLapsedSecs" (get-error-time e) - "error" (.get_error e)})})) - -(defmulti unpack-comp-agg-stat - (fn [[_ ^ComponentAggregateStats s]] (.get_type s))) - -(defmethod unpack-comp-agg-stat ComponentType/BOLT - [[window ^ComponentAggregateStats s]] - (let [^CommonAggregateStats comm-s (.get_common_stats s) - ^SpecificAggregateStats spec-s (.get_specific_stats s) - ^BoltAggregateStats bolt-s (.get_bolt spec-s)] - {"window" window - "windowPretty" (window-hint window) - "emitted" (.get_emitted comm-s) - "transferred" (.get_transferred comm-s) - "acked" (.get_acked comm-s) - "failed" (.get_failed comm-s) - "executeLatency" (float-str (.get_execute_latency_ms bolt-s)) - "processLatency" (float-str (.get_process_latency_ms bolt-s)) - "executed" (.get_executed bolt-s) - "capacity" (float-str (.get_capacity bolt-s))})) - -(defmethod unpack-comp-agg-stat ComponentType/SPOUT - [[window ^ComponentAggregateStats s]] - (let [^CommonAggregateStats comm-s (.get_common_stats s) - ^SpecificAggregateStats spec-s (.get_specific_stats s) - ^SpoutAggregateStats spout-s (.get_spout spec-s)] - {"window" window - "windowPretty" (window-hint window) - "emitted" (.get_emitted comm-s) - "transferred" (.get_transferred comm-s) - "acked" (.get_acked comm-s) - "failed" (.get_failed comm-s) - "completeLatency" (float-str (.get_complete_latency_ms spout-s))})) - -(defn- unpack-bolt-input-stat - [[^GlobalStreamId s ^ComponentAggregateStats stats]] - (let [^SpecificAggregateStats sas (.get_specific_stats stats) - ^BoltAggregateStats bas (.get_bolt sas) - ^CommonAggregateStats cas (.get_common_stats stats) - comp-id (.get_componentId s)] - {"component" comp-id - "encodedComponentId" (url-encode comp-id) - "stream" (.get_streamId s) - "executeLatency" (float-str (.get_execute_latency_ms bas)) - "processLatency" (float-str (.get_process_latency_ms bas)) - "executed" (nil-to-zero (.get_executed bas)) - "acked" (nil-to-zero (.get_acked cas)) - "failed" (nil-to-zero (.get_failed cas))})) - -(defmulti unpack-comp-output-stat - (fn [[_ ^ComponentAggregateStats s]] (.get_type s))) - -(defmethod unpack-comp-output-stat ComponentType/BOLT - [[stream-id ^ComponentAggregateStats stats]] - (let [^CommonAggregateStats cas (.get_common_stats stats)] - {"stream" stream-id - "emitted" (nil-to-zero (.get_emitted cas)) - "transferred" (nil-to-zero (.get_transferred cas))})) - -(defmethod unpack-comp-output-stat ComponentType/SPOUT - [[stream-id ^ComponentAggregateStats stats]] - (let [^CommonAggregateStats cas (.get_common_stats stats) - ^SpecificAggregateStats spec-s (.get_specific_stats stats) - ^SpoutAggregateStats spout-s (.get_spout spec-s)] - {"stream" stream-id - "emitted" (nil-to-zero (.get_emitted cas)) - "transferred" (nil-to-zero (.get_transferred cas)) - "completeLatency" (float-str (.get_complete_latency_ms spout-s)) - "acked" (nil-to-zero (.get_acked cas)) - "failed" (nil-to-zero (.get_failed cas))})) - -(defmulti unpack-comp-exec-stat - (fn [_ _ ^ComponentAggregateStats cas] (.get_type (.get_stats ^ExecutorAggregateStats cas)))) - -(defmethod unpack-comp-exec-stat ComponentType/BOLT - [topology-id secure? ^ExecutorAggregateStats eas] - (let [^ExecutorSummary summ (.get_exec_summary eas) - ^ExecutorInfo info (.get_executor_info summ) - ^ComponentAggregateStats stats (.get_stats eas) - ^SpecificAggregateStats ss (.get_specific_stats stats) - ^BoltAggregateStats bas (.get_bolt ss) - ^CommonAggregateStats cas (.get_common_stats stats) - host (.get_host summ) - port (.get_port summ) - exec-id (pretty-executor-info info) - uptime (.get_uptime_secs summ)] - {"id" exec-id - "encodedId" (url-encode exec-id) - "uptime" (pretty-uptime-sec uptime) - "uptimeSeconds" uptime - "host" host - "port" port - "emitted" (nil-to-zero (.get_emitted cas)) - "transferred" (nil-to-zero (.get_transferred cas)) - "capacity" (float-str (nil-to-zero (.get_capacity bas))) - "executeLatency" (float-str (.get_execute_latency_ms bas)) - "executed" (nil-to-zero (.get_executed bas)) - "processLatency" (float-str (.get_process_latency_ms bas)) - "acked" (nil-to-zero (.get_acked cas)) - "failed" (nil-to-zero (.get_failed cas)) - "workerLogLink" (worker-log-link host port topology-id secure?)})) - -(defmethod unpack-comp-exec-stat ComponentType/SPOUT - [topology-id secure? ^ExecutorAggregateStats eas] - (let [^ExecutorSummary summ (.get_exec_summary eas) - ^ExecutorInfo info (.get_executor_info summ) - ^ComponentAggregateStats stats (.get_stats eas) - ^SpecificAggregateStats ss (.get_specific_stats stats) - ^SpoutAggregateStats sas (.get_spout ss) - ^CommonAggregateStats cas (.get_common_stats stats) - host (.get_host summ) - port (.get_port summ) - exec-id (pretty-executor-info info) - uptime (.get_uptime_secs summ)] - {"id" exec-id - "encodedId" (url-encode exec-id) - "uptime" (pretty-uptime-sec uptime) - "uptimeSeconds" uptime - "host" host - "port" port - "emitted" (nil-to-zero (.get_emitted cas)) - "transferred" (nil-to-zero (.get_transferred cas)) - "completeLatency" (float-str (.get_complete_latency_ms sas)) - "acked" (nil-to-zero (.get_acked cas)) - "failed" (nil-to-zero (.get_failed cas)) - "workerLogLink" (worker-log-link host port topology-id secure?)})) - -(defmulti unpack-component-page-info - "Unpacks component-specific info to clojure data structures" - (fn [^ComponentPageInfo info & _] - (.get_component_type info))) - -(defmethod unpack-component-page-info ComponentType/BOLT - [^ComponentPageInfo info topology-id window include-sys? secure?] - (merge - {"boltStats" (map unpack-comp-agg-stat (.get_window_to_stats info)) - "inputStats" (map unpack-bolt-input-stat (.get_gsid_to_input_stats info)) - "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats info)) - "executorStats" (map (partial unpack-comp-exec-stat topology-id secure?) - (.get_exec_stats info))} - (-> info .get_errors (component-errors topology-id secure?)))) - -(defmethod unpack-component-page-info ComponentType/SPOUT - [^ComponentPageInfo info topology-id window include-sys? secure?] - (merge - {"spoutSummary" (map unpack-comp-agg-stat (.get_window_to_stats info)) - "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats info)) - "executorStats" (map (partial unpack-comp-exec-stat topology-id secure?) - (.get_exec_stats info))} - (-> info .get_errors (component-errors topology-id secure?)))) - -(defn get-active-profile-actions - [nimbus topology-id component] - (let [profile-actions (.getComponentPendingProfileActions nimbus - topology-id - component - ProfileAction/JPROFILE_STOP) - latest-profile-actions (map clojurify-profile-request profile-actions) - active-actions (map (fn [profile-action] - {"host" (:host profile-action) - "port" (str (:port profile-action)) - "dumplink" (worker-dump-link (:host profile-action) (str (:port profile-action)) topology-id) - "timestamp" (str (- (:timestamp profile-action) (System/currentTimeMillis)))}) - latest-profile-actions)] - (log-message "Latest-active actions are: " (pr active-actions)) - active-actions)) - -(defn component-page - [topology-id component window include-sys? user secure?] - (thrift/with-configured-nimbus-connection nimbus - (let [window (or window ":all-time") - window-hint (window-hint window) - comp-page-info (.getComponentPageInfo ^Nimbus$Client nimbus - topology-id - component - window - include-sys?) - topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus - topology-id)) - msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS) - [debugEnabled - samplingPct] (if-let [debug-opts (.get_debug_options comp-page-info)] - [(.is_enable debug-opts) - (.get_samplingpct debug-opts)])] - (assoc - (unpack-component-page-info comp-page-info - topology-id - window - include-sys? - secure?) - "user" user - "id" component - "encodedId" (url-encode component) - "name" (.get_topology_name comp-page-info) - "executors" (.get_num_executors comp-page-info) - "tasks" (.get_num_tasks comp-page-info) - "topologyId" topology-id - "topologyStatus" (.get_topology_status comp-page-info) - "encodedTopologyId" (url-encode topology-id) - "window" window - "componentType" (-> comp-page-info .get_component_type str lower-case) - "windowHint" window-hint - "debug" (or debugEnabled false) - "samplingPct" (or samplingPct 10) - "eventLogLink" (event-log-link topology-id - component - (.get_eventlog_host comp-page-info) - (.get_eventlog_port comp-page-info) - secure?) - "profileActionEnabled" (*STORM-CONF* WORKER-PROFILER-ENABLED) - "profilerActive" (if (*STORM-CONF* WORKER-PROFILER-ENABLED) - (get-active-profile-actions nimbus topology-id component) - []))))) - -(defn- level-to-dict [level] - (if level - (let [timeout (.get_reset_log_level_timeout_secs level) - timeout-epoch (.get_reset_log_level_timeout_epoch level) - target-level (.get_target_log_level level) - reset-level (.get_reset_log_level level)] - {"target_level" (.toString (Level/toLevel target-level)) - "reset_level" (.toString (Level/toLevel reset-level)) - "timeout" timeout - "timeout_epoch" timeout-epoch}))) - -(defn log-config [topology-id] - (thrift/with-configured-nimbus-connection - nimbus - (let [log-config (.getLogConfig ^Nimbus$Client nimbus topology-id) - named-logger-levels (into {} - (for [[key val] (.get_named_logger_level log-config)] - [(str key) (level-to-dict val)]))] - {"namedLoggerLevels" named-logger-levels}))) - -(defn topology-config [topology-id] - (thrift/with-configured-nimbus-connection nimbus - (from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id)))) - -(defn topology-op-response [topology-id op] - {"topologyOperation" op, - "topologyId" topology-id, - "status" "success" - }) - -(defn component-op-response [topology-id component-id op] - {"topologyOperation" op, - "topologyId" topology-id, - "componentId" component-id, - "status" "success" - }) - -(defn check-include-sys? - [sys?] - (if (or (nil? sys?) (= "false" sys?)) false true)) - -(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*)) - -(defn populate-context! - "Populate the Storm RequestContext from an servlet-request. This should be called in each handler" - [servlet-request] - (when http-creds-handler - (.populateContext http-creds-handler (ReqContext/context) servlet-request))) - -(defn get-user-name - [servlet-request] - (.getUserName http-creds-handler servlet-request)) - -(defroutes main-routes - (GET "/api/v1/cluster/configuration" [& m] - (mark! ui:num-cluster-configuration-http-requests) - (json-response (cluster-configuration) - (:callback m) :serialize-fn identity)) - (GET "/api/v1/cluster/summary" [:as {:keys [cookies servlet-request]} & m] - (mark! ui:num-cluster-summary-http-requests) - (populate-context! servlet-request) - (assert-authorized-user "getClusterInfo") - (let [user (get-user-name servlet-request)] - (json-response (assoc (cluster-summary user) - "bugtracker-url" (*STORM-CONF* UI-PROJECT-BUGTRACKER-URL) - "central-log-url" (*STORM-CONF* UI-CENTRAL-LOGGING-URL)) (:callback m)))) - (GET "/api/v1/nimbus/summary" [:as {:keys [cookies servlet-request]} & m] - (mark! ui:num-nimbus-summary-http-requests) - (populate-context! servlet-request) - (assert-authorized-user "getClusterInfo") - (json-response (nimbus-summary) (:callback m))) - (GET "/api/v1/history/summary" [:as {:keys [cookies servlet-request]} & m] - (let [user (.getUserName http-creds-handler servlet-request)] - (json-response (topology-history-info user) (:callback m)))) - (GET "/api/v1/supervisor/summary" [:as {:keys [cookies servlet-request]} & m] - (mark! ui:num-supervisor-summary-http-requests) - (populate-context! servlet-request) - (assert-authorized-user "getClusterInfo") - (json-response (assoc (supervisor-summary) - "logviewerPort" (*STORM-CONF* LOGVIEWER-PORT)) (:callback m))) - (GET "/api/v1/topology/summary" [:as {:keys [cookies servlet-request]} & m] - (mark! ui:num-all-topologies-summary-http-requests) - (populate-context! servlet-request) - (assert-authorized-user "getClusterInfo") - (json-response (all-topologies-summary) (:callback m))) - (GET "/api/v1/topology-workers/:id" [:as {:keys [cookies servlet-request]} id & m] - (let [id (url-decode id)] - (json-response {"hostPortList" (worker-host-port id) - "logviewerPort" (*STORM-CONF* LOGVIEWER-PORT)} (:callback m)))) - (GET "/api/v1/topology/:id" [:as {:keys [cookies servlet-request scheme]} id & m] - (mark! ui:num-topology-page-http-requests) - (populate-context! servlet-request) - (assert-authorized-user "getTopology" (topology-config id)) - (let [user (get-user-name servlet-request)] - (json-response (topology-page id (:window m) (check-include-sys? (:sys m)) user (= scheme :https)) (:callback m)))) - (GET "/api/v1/topology/:id/visualization-init" [:as {:keys [cookies servlet-request]} id & m] - (mark! ui:num-build-visualization-http-requests) - (populate-context! servlet-request) - (assert-authorized-user "getTopology" (topology-config id)) - (json-response (build-visualization id (:window m) (check-include-sys? (:sys m))) (:callback m))) - (GET "/api/v1/topology/:id/visualization" [:as {:keys [cookies servlet-request]} id & m] - (mark! ui:num-mk-visualization-data-http-requests) - (populate-context! servlet-request) - (assert-authorized-user "getTopology" (topology-config id)) - (json-response (mk-visualization-data id (:window m) (check-include-sys? (:sys m))) (:callback m))) - (GET "/api/v1/topology/:id/component/:component" [:as {:keys [cookies servlet-request scheme]} id component & m] - (mark! ui:num-component-page-http-requests) - (populate-context! servlet-request) - (assert-authorized-user "getTopology" (topology-config id)) - (let [user (get-user-name servlet-request)] - (json-response - (component-page id component (:window m) (check-include-sys? (:sys m)) user (= scheme :https)) - (:callback m)))) - (GET "/api/v1/topology/:id/logconfig" [:as {:keys [cookies servlet-request]} id & m] - (mark! ui:num-log-config-http-requests) - (populate-context! servlet-request) - (assert-authorized-user "getTopology" (topology-config id)) - (json-response (log-config id) (:callback m))) - (POST "/api/v1/topology/:id/activate" [:as {:keys [cookies servlet-request]} id & m] - (mark! ui:num-activate-topology-http-requests) - (populate-context! servlet-request) - (assert-authorized-user "activate" (topology-config id)) - (thrift/with-configured-nimbus-connection nimbus - (let [tplg (->> (doto - (GetInfoOptions.) - (.set_num_err_choice NumErrorsChoice/NONE)) - (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id)) - name (.get_name tplg)] - (.activate nimbus name) - (log-message "Activating topology '" name "'"))) - (json-response (topology-op-response id "activate") (m "callback"))) - (POST "/api/v1/topology/:id/deactivate" [:as {:keys [cookies servlet-request]} id & m] - (mark! ui:num-deactivate-topology-http-requests) - (populate-context! servlet-request) - (assert-authorized-user "deactivate" (topology-config id)) - (thrift/with-configured-nimbus-connection nimbus - (let [tplg (->> (doto - (GetInfoOptions.) - (.set_num_err_choice NumErrorsChoice/NONE)) - (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id)) - name (.get_name tplg)] - (.deactivate nimbus name) - (log-message "Deactivating topology '" name "'"))) - (json-response (topology-op-response id "deactivate") (m "callback"))) - (POST "/api/v1/topology/:id/debug/:action/:spct" [:as {:keys [cookies servlet-request]} id action spct & m] - (mark! ui:num-debug-topology-http-requests) - (populate-context! servlet-request) - (assert-authorized-user "debug" (topology-config id)) - (thrift/with-configured-nimbus-connection nimbus - (let [tplg (->> (doto - (GetInfoOptions.) - (.set_num_err_choice NumErrorsChoice/NONE)) - (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id)) - name (.get_name tplg) - enable? (= "enable" action)] - (.debug nimbus name "" enable? (Integer/parseInt spct)) - (log-message "Debug topology [" name "] action [" action "] sampling pct [" spct "]"))) - (json-response (topology-op-response id (str "debug/" action)) (m "callback"))) - (POST "/api/v1/topology/:id/component/:component/debug/:action/:spct" [:as {:keys [cookies servlet-request]} id component action spct & m] - (mark! ui:num-component-op-response-http-requests) - (populate-context! servlet-request) - (assert-authorized-user "debug" (topology-config id)) - (thrift/with-configured-nimbus-connection nimbus - (let [tplg (->> (doto - (GetInfoOptions.) - (.set_num_err_choice NumErrorsChoice/NONE)) - (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id)) - name (.get_name tplg) - enable? (= "enable" action)] - (.debug nimbus name component enable? (Integer/parseInt spct)) - (log-message "Debug topology [" name "] component [" component "] action [" action "] sampling pct [" spct "]"))) - (json-response (component-op-response id component (str "/debug/" action)) (m "callback"))) - (POST "/api/v1/topology/:id/rebalance/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time & m] - (mark! ui:num-topology-op-response-http-requests) - (populate-context! servlet-request) - (assert-authorized-user "rebalance" (topology-config id)) - (thrift/with-configured-nimbus-connection nimbus - (let [tplg (->> (doto - (GetInfoOptions.) - (.set_num_err_choice NumErrorsChoice/NONE)) - (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id)) - name (.get_name tplg) - rebalance-options (m "rebalanceOptions") - options (RebalanceOptions.)] - (.set_wait_secs options (Integer/parseInt wait-time)) - (if (and (not-nil? rebalance-options) (contains? rebalance-options "numWorkers")) - (.set_num_workers options (Integer/parseInt (.toString (rebalance-options "numWorkers"))))) - (if (and (not-nil? rebalance-options) (contains? rebalance-options "executors")) - (doseq [keyval (rebalance-options "executors")] - (.put_to_num_executors options (key keyval) (Integer/parseInt (.toString (val keyval)))))) - (.rebalance nimbus name options) - (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs"))) - (json-response (topology-op-response id "rebalance") (m "callback"))) - (POST "/api/v1/topology/:id/kill/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time & m] - (mark! ui:num-topology-op-response-http-requests) - (populate-context! servlet-request) - (assert-authorized-user "killTopology" (topology-config id)) - (thrift/with-configured-nimbus-connection nimbus - (let [tplg (->> (doto - (GetInfoOptions.) - (.set_num_err_choice NumErrorsChoice/NONE)) - (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id)) - name (.get_name tplg) - options (KillOptions.)] - (.set_wait_secs options (Integer/parseInt wait-time)) - (.killTopologyWithOpts nimbus name options) - (log-message "Killing topology '" name "' with wait time: " wait-time " secs"))) - (json-response (topology-op-response id "kill") (m "callback"))) - (POST "/api/v1/topology/:id/logconfig" [:as {:keys [cookies servlet-request]} id namedLoggerLevels & m] - (mark! ui:num-topology-op-response-http-requests) - (populate-context! servlet-request) - (assert-authorized-user "setLogConfig" (topology-config id)) - (thrift/with-configured-nimbus-connection - nimbus - (let [new-log-config (LogConfig.)] - (doseq [[key level] namedLoggerLevels] - (let [logger-name (str key) - target-level (.get level "target_level") - timeout (or (.get level "timeout") 0) - named-logger-level (LogLevel.)] - ;; if target-level is nil, do not set it, user wants to clear - (log-message "The target level for " logger-name " is " target-level) - (if (nil? target-level) - (do - (.set_action named-logger-level LogLevelAction/REMOVE) - (.unset_target_log_level named-logger-level)) - (do - (.set_action named-logger-level LogLevelAction/UPDATE) - ;; the toLevel here ensures the string we get is valid - (.set_target_log_level named-logger-level (.name (Level/toLevel target-level))) - (.set_reset_log_level_timeout_secs named-logger-level timeout))) - (log-message "Adding this " logger-name " " named-logger-level " to " new-log-config) - (.put_to_named_logger_level new-log-config logger-name named-logger-level))) - (log-message "Setting topology " id " log config " new-log-config) - (.setLogConfig nimbus id new-log-config) - (json-response (log-config id) (m "callback"))))) - - (GET "/api/v1/topology/:id/profiling/start/:host-port/:timeout" - [:as {:keys [servlet-request]} id host-port timeout & m] - (thrift/with-configured-nimbus-connection nimbus - (assert-authorized-user "setWorkerProfiler" (topology-config id)) - (assert-authorized-profiler-action "start") - (let [[host, port] (split host-port #":") - nodeinfo (NodeInfo. host (set [(Long. port)])) - timestamp (+ (System/currentTimeMillis) (* 60000 (Long. timeout))) - request (ProfileRequest. nodeinfo - ProfileAction/JPROFILE_STOP)] - (.set_time_stamp request timestamp) - (.setWorkerProfiler nimbus id request) - (json-response {"status" "ok" - "id" host-port - "timeout" timeout - "dumplink" (worker-dump-link - host - port - id)} - (m "callback"))))) - - (GET "/api/v1/topology/:id/profiling/stop/:host-port" - [:as {:keys [servlet-request]} id host-port & m] - (thrift/with-configured-nimbus-connection nimbus - (assert-authorized-user "setWorkerProfiler" (topology-config id)) - (assert-authorized-profiler-action "stop") - (let [[host, port] (split host-port #":") - nodeinfo (NodeInfo. host (set [(Long. port)])) - timestamp 0 - request (ProfileRequest. nodeinfo - ProfileAction/JPROFILE_STOP)] - (.set_time_stamp request timestamp) - (.setWorkerProfiler nimbus id request) - (json-response {"status" "ok" - "id" host-port} - (m "callback"))))) - - (GET "/api/v1/topology/:id/profiling/dumpprofile/:host-port" - [:as {:keys [servlet-request]} id host-port & m] - (thrift/with-configured-nimbus-connection nimbus - (assert-authorized-user "setWorkerProfiler" (topology-config id)) - (assert-authorized-profiler-action "dumpprofile") - (let [[host, port] (split host-port #":") - nodeinfo (NodeInfo. host (set [(Long. port)])) - timestamp (System/currentTimeMillis) - request (ProfileRequest. nodeinfo - ProfileAction/JPROFILE_DUMP)] - (.set_time_stamp request timestamp) - (.setWorkerProfiler nimbus id request) - (json-response {"status" "ok" - "id" host-port} - (m "callback"))))) - - (GET "/api/v1/topology/:id/profiling/dumpjstack/:host-port" - [:as {:keys [servlet-request]} id host-port & m] - (thrift/with-configured-nimbus-connection nimbus - (assert-authorized-user "setWorkerProfiler" (topology-config id)) - (assert-authorized-profiler-action "dumpjstack") - (let [[host, port] (split host-port #":") - nodeinfo (NodeInfo. host (set [(Long. port)])) - timestamp (System/currentTimeMillis) - request (ProfileRequest. nodeinfo - ProfileAction/JSTACK_DUMP)] - (.set_time_stamp request timestamp) - (.setWorkerProfiler nimbus id request) - (json-response {"status" "ok" - "id" host-port} - (m "callback"))))) - - (GET "/api/v1/topology/:id/profiling/restartworker/:host-port" - [:as {:keys [servlet-request]} id host-port & m] - (thrift/with-configured-nimbus-connection nimbus - (assert-authorized-user "setWorkerProfiler" (topology-config id)) - (assert-authorized-profiler-action "restartworker") - (let [[host, port] (split host-port #":") - nodeinfo (NodeInfo. host (set [(Long. port)])) - timestamp (System/currentTimeMillis) - request (ProfileRequest. nodeinfo - ProfileAction/JVM_RESTART)] - (.set_time_stamp request timestamp) - (.setWorkerProfiler nimbus id request) - (json-response {"status" "ok" - "id" host-port} - (m "callback"))))) - - (GET "/api/v1/topology/:id/profiling/dumpheap/:host-port" - [:as {:keys [servlet-request]} id host-port & m] - (thrift/with-configured-nimbus-connection nimbus - (assert-authorized-user "setWorkerProfiler" (topology-config id)) - (assert-authorized-profiler-action "dumpheap") - (let [[host, port] (split host-port #":") - nodeinfo (NodeInfo. host (set [(Long. port)])) - timestamp (System/currentTimeMillis) - request (ProfileRequest. nodeinfo - ProfileAction/JMAP_DUMP)] - (.set_time_stamp request timestamp) - (.setWorkerProfiler nimbus id request) - (json-response {"status" "ok" - "id" host-port} - (m "callback"))))) - - (GET "/" [:as {cookies :cookies}] - (mark! ui:num-main-page-http-requests) - (resp/redirect "/index.html")) - (route/resources "/") - (route/not-found "Page not found")) - -(defn catch-errors - [handler] - (fn [request] - (try - (handler request) - (catch Exception ex - (json-response (exception->json ex) ((:query-params request) "callback") :status 500))))) - -(def app - (handler/site (-> main-routes - (wrap-json-params) - (wrap-multipart-params) - (wrap-reload '[backtype.storm.ui.core]) - requests-middleware - catch-errors))) - -(defn start-server! - [] - (try - (let [conf *STORM-CONF* - header-buffer-size (int (.get conf UI-HEADER-BUFFER-BYTES)) - filters-confs [{:filter-class (conf UI-FILTER) - :filter-params (conf UI-FILTER-PARAMS)}] - https-port (if (not-nil? (conf UI-HTTPS-PORT)) (conf UI-HTTPS-PORT) 0) - https-ks-path (conf UI-HTTPS-KEYSTORE-PATH) - https-ks-password (conf UI-HTTPS-KEYSTORE-PASSWORD) - https-ks-type (conf UI-HTTPS-KEYSTORE-TYPE) - https-key-password (conf UI-HTTPS-KEY-PASSWORD) - https-ts-path (conf UI-HTTPS-TRUSTSTORE-PATH) - https-ts-password (conf UI-HTTPS-TRUSTSTORE-PASSWORD) - https-ts-type (conf UI-HTTPS-TRUSTSTORE-TYPE) - https-want-client-auth (conf UI-HTTPS-WANT-CLIENT-AUTH) - https-need-client-auth (conf UI-HTTPS-NEED-CLIENT-AUTH)] - (start-metrics-reporters) - (storm-run-jetty {:port (conf UI-PORT) - :host (conf UI-HOST) - :https-port https-port - :configurator (fn [server] - (config-ssl server - https-port - https-ks-path - https-ks-password - https-ks-type - https-key-password - https-ts-path - https-ts-password - https-ts-type - https-need-client-auth - https-want-client-auth) - (doseq [connector (.getConnectors server)] - (.setRequestHeaderSize connector header-buffer-size)) - (config-filter server app filters-confs))})) - (catch Exception ex - (log-error ex)))) - -(defn -main - [] - (log-message "Starting ui server for storm version '" STORM-VERSION "'") - (start-server!))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/ui/helpers.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/ui/helpers.clj b/storm-core/src/clj/backtype/storm/ui/helpers.clj deleted file mode 100644 index e0db5c8..0000000 --- a/storm-core/src/clj/backtype/storm/ui/helpers.clj +++ /dev/null @@ -1,240 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.ui.helpers - (:use compojure.core) - (:use [hiccup core page-helpers]) - (:use [clojure - [string :only [blank? join]] - [walk :only [keywordize-keys]]]) - (:use [backtype.storm config log]) - (:use [backtype.storm.util :only [clojurify-structure uuid defnk to-json url-encode not-nil?]]) - (:use [clj-time coerce format]) - (:import [backtype.storm.generated ExecutorInfo ExecutorSummary]) - (:import [backtype.storm.logging.filters AccessLoggingFilter]) - (:import [java.util EnumSet]) - (:import [org.eclipse.jetty.server Server] - [org.eclipse.jetty.server.nio SelectChannelConnector] - [org.eclipse.jetty.server.ssl SslSocketConnector] - [org.eclipse.jetty.servlet ServletHolder FilterMapping] - [org.eclipse.jetty.util.ssl SslContextFactory] - [org.eclipse.jetty.server DispatcherType] - [org.eclipse.jetty.servlets CrossOriginFilter]) - (:require [ring.util servlet]) - (:require [compojure.route :as route] - [compojure.handler :as handler]) - (:require [metrics.meters :refer [defmeter mark!]])) - -(defmeter num-web-requests) -(defn requests-middleware - "Coda Hale metric for counting the number of web requests." - [handler] - (fn [req] - (mark! num-web-requests) - (handler req))) - -(defn split-divide [val divider] - [(Integer. (int (/ val divider))) (mod val divider)] - ) - -(def PRETTY-SEC-DIVIDERS - [["s" 60] - ["m" 60] - ["h" 24] - ["d" nil]]) - -(def PRETTY-MS-DIVIDERS - (cons ["ms" 1000] - PRETTY-SEC-DIVIDERS)) - -(defn pretty-uptime-str* [val dividers] - (let [val (if (string? val) (Integer/parseInt val) val) - vals (reduce (fn [[state val] [_ divider]] - (if (pos? val) - (let [[divided mod] (if divider - (split-divide val divider) - [nil val])] - [(concat state [mod]) - divided] - ) - [state val] - )) - [[] val] - dividers) - strs (->> - (first vals) - (map - (fn [[suffix _] val] - (str val suffix)) - dividers - ))] - (join " " (reverse strs)) - )) - -(defn pretty-uptime-sec [secs] - (pretty-uptime-str* secs PRETTY-SEC-DIVIDERS)) - -(defn pretty-uptime-ms [ms] - (pretty-uptime-str* ms PRETTY-MS-DIVIDERS)) - - -(defelem table [headers-map data] - [:table - [:thead - [:tr - (for [h headers-map] - [:th (if (:text h) [:span (:attr h) (:text h)] h)]) - ]] - [:tbody - (for [row data] - [:tr - (for [col row] - [:td col] - )] - )] - ]) - -(defn url-format [fmt & args] - (String/format fmt - (to-array (map #(url-encode (str %)) args)))) - -(defn pretty-executor-info [^ExecutorInfo e] - (str "[" (.get_task_start e) "-" (.get_task_end e) "]")) - -(defn unauthorized-user-json - [user] - {"error" "No Authorization" - "errorMessage" (str "User " user " is not authorized.")}) - -(defn unauthorized-user-html [user] - [[:h2 "User '" (escape-html user) "' is not authorized."]]) - -(defn- mk-ssl-connector [port ks-path ks-password ks-type key-password - ts-path ts-password ts-type need-client-auth want-client-auth] - (let [sslContextFactory (doto (SslContextFactory.) - (.setExcludeCipherSuites (into-array String ["SSL_RSA_WITH_RC4_128_MD5" "SSL_RSA_WITH_RC4_128_SHA"])) - (.setExcludeProtocols (into-array String ["SSLv3"])) - (.setAllowRenegotiate false) - (.setKeyStorePath ks-path) - (.setKeyStoreType ks-type) - (.setKeyStorePassword ks-password) - (.setKeyManagerPassword key-password))] - (if (and (not-nil? ts-path) (not-nil? ts-password) (not-nil? ts-type)) - (do - (.setTrustStore sslContextFactory ts-path) - (.setTrustStoreType sslContextFactory ts-type) - (.setTrustStorePassword sslContextFactory ts-password))) - (cond - need-client-auth (.setNeedClientAuth sslContextFactory true) - want-client-auth (.setWantClientAuth sslContextFactory true)) - (doto (SslSocketConnector. sslContextFactory) - (.setPort port)))) - - -(defn config-ssl [server port ks-path ks-password ks-type key-password - ts-path ts-password ts-type need-client-auth want-client-auth] - (when (> port 0) - (.addConnector server (mk-ssl-connector port ks-path ks-password ks-type key-password - ts-path ts-password ts-type need-client-auth want-client-auth)))) - -(defn cors-filter-handler - [] - (doto (org.eclipse.jetty.servlet.FilterHolder. (CrossOriginFilter.)) - (.setInitParameter CrossOriginFilter/ALLOWED_ORIGINS_PARAM "*") - (.setInitParameter CrossOriginFilter/ALLOWED_METHODS_PARAM "GET, POST, PUT") - (.setInitParameter CrossOriginFilter/ALLOWED_HEADERS_PARAM "X-Requested-With, X-Requested-By, Access-Control-Allow-Origin, Content-Type, Content-Length, Accept, Origin") - (.setInitParameter CrossOriginFilter/ACCESS_CONTROL_ALLOW_ORIGIN_HEADER "*") - )) - -(defn mk-access-logging-filter-handler [] - (org.eclipse.jetty.servlet.FilterHolder. (AccessLoggingFilter.))) - -(defn config-filter [server handler filters-confs] - (if filters-confs - (let [servlet-holder (ServletHolder. - (ring.util.servlet/servlet handler)) - context (doto (org.eclipse.jetty.servlet.ServletContextHandler. server "/") - (.addServlet servlet-holder "/"))] - (.addFilter context (cors-filter-handler) "/*" (EnumSet/allOf DispatcherType)) - (doseq [{:keys [filter-name filter-class filter-params]} filters-confs] - (if filter-class - (let [filter-holder (doto (org.eclipse.jetty.servlet.FilterHolder.) - (.setClassName filter-class) - (.setName (or filter-name filter-class)) - (.setInitParameters (or filter-params {})))] - (.addFilter context filter-holder "/*" FilterMapping/ALL)))) - (.addFilter context (mk-access-logging-filter-handler) "/*" (EnumSet/allOf DispatcherType)) - (.setHandler server context)))) - -(defn ring-response-from-exception [ex] - {:headers {} - :status 400 - :body (.getMessage ex)}) - -(defn- remove-non-ssl-connectors [server] - (doseq [c (.getConnectors server)] - (when-not (or (nil? c) (instance? SslSocketConnector c)) - (.removeConnector server c) - )) - server) - -;; Modified from ring.adapter.jetty 1.3.0 -(defn- jetty-create-server - "Construct a Jetty Server instance." - [options] - (let [connector (doto (SelectChannelConnector.) - (.setPort (options :port 80)) - (.setHost (options :host)) - (.setMaxIdleTime (options :max-idle-time 200000))) - server (doto (Server.) - (.addConnector connector) - (.setSendDateHeader true)) - https-port (options :https-port)] - (if (and (not-nil? https-port) (> https-port 0)) (remove-non-ssl-connectors server)) - server)) - -(defn storm-run-jetty - "Modified version of run-jetty - Assumes configurator sets handler." - [config] - {:pre [(:configurator config)]} - (let [#^Server s (jetty-create-server (dissoc config :configurator)) - configurator (:configurator config)] - (configurator s) - (.start s))) - -(defn wrap-json-in-callback [callback response] - (str callback "(" response ");")) - -(defnk json-response - [data callback :serialize-fn to-json :status 200 :headers {}] - {:status status - :headers (merge {"Cache-Control" "no-cache, no-store" - "Access-Control-Allow-Origin" "*" - "Access-Control-Allow-Headers" "Content-Type, Access-Control-Allow-Headers, Access-Controler-Allow-Origin, X-Requested-By, X-Csrf-Token, Authorization, X-Requested-With"} - (if (not-nil? callback) {"Content-Type" "application/javascript;charset=utf-8"} - {"Content-Type" "application/json;charset=utf-8"}) - headers) - :body (if (not-nil? callback) - (wrap-json-in-callback callback (serialize-fn data)) - (serialize-fn data))}) - -(defn exception->json - [ex] - {"error" "Internal Server Error" - "errorMessage" - (let [sw (java.io.StringWriter.)] - (.printStackTrace ex (java.io.PrintWriter. sw)) - (.toString sw))})
