[
https://issues.apache.org/jira/browse/STORM-820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14563481#comment-14563481
]
ASF GitHub Bot commented on STORM-820:
--------------------------------------
Github user d2r commented on a diff in the pull request:
https://github.com/apache/storm/pull/554#discussion_r31266476
--- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
@@ -373,4 +362,1222 @@
(ExecutorStats. (window-set-converter (:emitted stats) str)
(window-set-converter (:transferred stats) str)
specific-stats
- rate)))
\ No newline at end of file
+ rate)))
+
+(defn- agg-bolt-lat-and-count
+ "Aggregates number executed, process latency, and execute latency across
all
+ streams."
+ [idk->exec-avg idk->proc-avg idk->num-executed]
+ {:pre (apply = (map #(set (keys %))
+ [idk->exec-avg
+ idk->proc-avg
+ idk->num-executed]))}
+ (letfn [(weight-avg [[id avg]] (let [num-e (get idk->num-executed id)]
+ (if (and avg num-e)
+ (* avg num-e)
+ 0)))]
+ {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
+ :processLatencyTotal (sum (map weight-avg idk->proc-avg))
+ :executed (sum (vals idk->num-executed))}))
+
+(defn- agg-spout-lat-and-count
+ "Aggregates number acked and complete latencies across all streams."
+ [sid->comp-avg sid->num-acked]
+ {:pre (apply = (map #(set (keys %))
+ [sid->comp-avg
+ sid->num-acked]))}
+ (letfn [(weight-avg [[id avg]] (* avg (get sid->num-acked id)))]
+ {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
+ :acked (sum (vals sid->num-acked))}))
+
+(defn add-pairs
+ ([] [0 0])
+ ([[a1 a2] [b1 b2]]
+ [(+ a1 b1) (+ a2 b2)]))
+
+(defn mk-include-sys-fn
+ [include-sys?]
+ (if include-sys?
+ (fn [_] true)
+ (fn [stream] (and (string? stream) (not (system-id? stream))))))
+
+(defn mk-include-sys-filter
+ "Returns a function that includes or excludes map entries whose keys are
+ system ids."
+ [include-sys?]
+ (if include-sys?
+ identity
+ (partial filter-key (mk-include-sys-fn false))))
+
+(defn- agg-bolt-streams-lat-and-count
+ "Aggregates number executed and process & execute latencies."
+ [idk->exec-avg idk->proc-avg idk->executed]
+ {:pre (apply = (map #(set (keys %))
+ [idk->exec-avg
+ idk->proc-avg
+ idk->executed]))}
+ (letfn [(weight-avg [id avg] (let [num-e (idk->executed id)]
+ (if (and avg num-e)
+ (* avg num-e)
+ 0)))]
+ (into {}
+ (for [k (keys idk->exec-avg)]
+ [k {:executeLatencyTotal (weight-avg k (idk->exec-avg k))
+ :processLatencyTotal (weight-avg k (idk->proc-avg k))
+ :executed (idk->executed k)}]))))
+
+(defn- agg-spout-streams-lat-and-count
+ "Aggregates number acked and complete latencies."
+ [idk->comp-avg idk->acked]
+ {:pre (apply = (map #(set (keys %))
+ [idk->comp-avg
+ idk->acked]))}
+ (letfn [(weight-avg [id avg] (let [num-e (get idk->acked id)]
+ (if (and avg num-e)
+ (* avg num-e)
+ 0)))]
+ (into {}
+ (for [k (keys idk->comp-avg)]
+ [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k))
+ :acked (get idk->acked k)}]))))
+
+(defn swap-map-order
+ "For a nested map, rearrange data such that the top-level keys become the
+ nested map's keys and vice versa.
+ Example:
+ {:a {:X :banana, :Y :pear}, :b {:X :apple, :Y :orange}}
+ -> {:Y {:a :pear, :b :orange}, :X {:a :banana, :b :apple}}"
+ [m]
+ (apply merge-with
+ merge
+ (map (fn [[k v]]
+ (into {}
+ (for [[k2 v2] v]
+ [k2 {k v2}])))
+ m)))
+
+(defn- compute-agg-capacity
+ "Computes the capacity metric for one executor given its heartbeat data
and
+ uptime."
+ [m uptime]
+ (when uptime
+ (->>
+ ;; For each stream, create weighted averages and counts.
+ (merge-with (fn weighted-avg+count-fn
+ [avg cnt]
+ [(* avg cnt) cnt])
+ (get (:execute-latencies m) (str TEN-MIN-IN-SECONDS))
+ (get (:executed m) (str TEN-MIN-IN-SECONDS)))
+ vals ;; Ignore the stream ids.
+ (reduce add-pairs
+ [0. 0]) ;; Combine weighted averages and counts.
+ ((fn [[weighted-avg cnt]]
+ (div weighted-avg (* 1000 (min uptime TEN-MIN-IN-SECONDS))))))))
+
+(defn agg-pre-merge-comp-page-bolt
+ [{exec-id :exec-id
+ host :host
+ port :port
+ uptime :uptime
+ comp-id :comp-id
+ num-tasks :num-tasks
+ statk->w->sid->num :stats}
+ window
+ include-sys?]
+ (let [str-key (partial map-key str)
+ handle-sys-components-fn (mk-include-sys-filter include-sys?)]
+ {:executor-id exec-id,
+ :host host,
+ :port port,
+ :uptime uptime,
+ :num-executors 1,
+ :num-tasks num-tasks,
+ :capacity (compute-agg-capacity statk->w->sid->num uptime)
+ :cid+sid->input-stats
+ (merge-with
+ merge
+ (swap-map-order
+ {:acked (-> statk->w->sid->num
+ :acked
+ str-key
+ (get window))
+ :failed (-> statk->w->sid->num
+ :failed
+ str-key
+ (get window))})
+ (agg-bolt-streams-lat-and-count (-> statk->w->sid->num
+ :execute-latencies
+ str-key
+ (get window))
+ (-> statk->w->sid->num
+ :process-latencies
+ str-key
+ (get window))
+ (-> statk->w->sid->num
+ :executed
+ str-key
+ (get window)))),
+ :sid->output-stats
+ (swap-map-order
+ {:emitted (-> statk->w->sid->num
+ :emitted
+ str-key
+ (get window)
+ handle-sys-components-fn)
+ :transferred (-> statk->w->sid->num
+ :transferred
+ str-key
+ (get window)
+ handle-sys-components-fn)})}))
+
+(defn agg-pre-merge-comp-page-spout
+ [{exec-id :exec-id
+ host :host
+ port :port
+ uptime :uptime
+ comp-id :comp-id
+ num-tasks :num-tasks
+ statk->w->sid->num :stats}
+ window
+ include-sys?]
+ (let [str-key (partial map-key str)
+ handle-sys-components-fn (mk-include-sys-filter include-sys?)]
+ {:executor-id exec-id,
+ :host host,
+ :port port,
+ :uptime uptime,
+ :num-executors 1,
+ :num-tasks num-tasks,
+ :sid->output-stats
+ (merge-with
+ merge
+ (agg-spout-streams-lat-and-count (-> statk->w->sid->num
+ :complete-latencies
+ str-key
+ (get window))
+ (-> statk->w->sid->num
+ :acked
+ str-key
+ (get window)))
+ (swap-map-order
+ {:acked (-> statk->w->sid->num
+ :acked
+ str-key
+ (get window))
+ :failed (-> statk->w->sid->num
+ :failed
+ str-key
+ (get window))
+ :emitted (-> statk->w->sid->num
+ :emitted
+ str-key
+ (get window)
+ handle-sys-components-fn)
+ :transferred (-> statk->w->sid->num
+ :transferred
+ str-key
+ (get window)
+ handle-sys-components-fn)}))}))
+
+(defn agg-pre-merge-topo-page-bolt
+ [{comp-id :comp-id
+ num-tasks :num-tasks
+ statk->w->sid->num :stats
+ uptime :uptime}
+ window
+ include-sys?]
+ (let [str-key (partial map-key str)
+ handle-sys-components-fn (mk-include-sys-filter include-sys?)]
+ {comp-id
+ (merge
+ (agg-bolt-lat-and-count (-> statk->w->sid->num
+ :execute-latencies
+ str-key
+ (get window))
+ (-> statk->w->sid->num
+ :process-latencies
+ str-key
+ (get window))
+ (-> statk->w->sid->num
+ :executed
+ str-key
+ (get window)))
+ {:num-executors 1
+ :num-tasks num-tasks
+ :emitted (-> statk->w->sid->num
+ :emitted
+ str-key
+ (get window)
+ handle-sys-components-fn
+ vals
+ sum)
+ :transferred (-> statk->w->sid->num
+ :transferred
+ str-key
+ (get window)
+ handle-sys-components-fn
+ vals
+ sum)
+ :capacity (compute-agg-capacity statk->w->sid->num uptime)
+ :acked (-> statk->w->sid->num
+ :acked
+ str-key
+ (get window)
+ vals
+ sum)
+ :failed (-> statk->w->sid->num
+ :failed
+ str-key
+ (get window)
+ vals
+ sum)})}))
+
+(defn agg-pre-merge-topo-page-spout
+ [{comp-id :comp-id
+ num-tasks :num-tasks
+ statk->w->sid->num :stats}
+ window
+ include-sys?]
+ (let [str-key (partial map-key str)
+ handle-sys-components-fn (mk-include-sys-filter include-sys?)]
+ {comp-id
+ (merge
+ (agg-spout-lat-and-count (-> statk->w->sid->num
+ :complete-latencies
+ str-key
+ (get window))
+ (-> statk->w->sid->num
+ :acked
+ str-key
+ (get window)))
+ {:num-executors 1
+ :num-tasks num-tasks
+ :emitted (-> statk->w->sid->num
+ :emitted
+ str-key
+ (get window)
+ handle-sys-components-fn
+ vals
+ sum)
+ :transferred (-> statk->w->sid->num
+ :transferred
+ str-key
+ (get window)
+ handle-sys-components-fn
+ vals
+ sum)
+ :failed (-> statk->w->sid->num
+ :failed
+ str-key
+ (get window)
+ vals
+ sum)})}))
+
+(defn apply-default
+ [f defaulting-fn & args]
+ (apply f (map defaulting-fn args)))
+
+(defn apply-or-0
+ [f & args]
+ (apply apply-default f #(or % 0) args))
+
+(defn sum-or-0
+ [& args]
+ (apply apply-or-0 + args))
+
+(defn max-or-0
+ [& args]
+ (apply apply-or-0 max args))
+
+(defn merge-agg-comp-stats-comp-page-bolt
+ [{acc-in :cid+sid->input-stats
+ acc-out :sid->output-stats
+ :as acc-bolt-stats}
+ {bolt-in :cid+sid->input-stats
+ bolt-out :sid->output-stats
+ :as bolt-stats}]
+ {:num-executors (inc (or (:num-executors acc-bolt-stats) 0)),
+ :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks
bolt-stats)),
+ :sid->output-stats (merge-with (partial merge-with sum-or-0)
+ acc-out
+ bolt-out),
+ :cid+sid->input-stats (merge-with (partial merge-with sum-or-0)
+ acc-in
+ bolt-in),
+ :executor-stats
+ (let [sum-streams (fn [m k] (->> m vals (map k) (apply sum-or-0)))
+ executed (sum-streams bolt-in :executed)]
+ (conj (:executor-stats acc-bolt-stats)
+ (merge
+ (select-keys bolt-stats
+ [:executor-id :uptime :host :port :capacity])
+ {:emitted (sum-streams bolt-out :emitted)
+ :transferred (sum-streams bolt-out :transferred)
+ :acked (sum-streams bolt-in :acked)
+ :failed (sum-streams bolt-in :failed)
+ :executed executed}
+ (->>
+ (if (and executed (pos? executed))
+ [(div (sum-streams bolt-in :executeLatencyTotal) executed)
+ (div (sum-streams bolt-in :processLatencyTotal)
executed)]
+ [nil nil])
+ (mapcat vector [:execute-latency :process-latency])
+ (apply assoc {})))))})
+
+(defn merge-agg-comp-stats-comp-page-spout
+ [{acc-out :sid->output-stats
+ :as acc-spout-stats}
+ {spout-out :sid->output-stats
+ :as spout-stats}]
+ {:num-executors (inc (or (:num-executors acc-spout-stats) 0)),
+ :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks
spout-stats)),
+ :sid->output-stats (merge-with (partial merge-with sum-or-0)
+ acc-out
+ spout-out),
+ :executor-stats
+ (let [sum-streams (fn [m k] (->> m vals (map k) (apply sum-or-0)))
+ acked (sum-streams spout-out :acked)]
+ (conj (:executor-stats acc-spout-stats)
+ (merge
+ (select-keys spout-stats [:executor-id :uptime :host :port])
+ {:emitted (sum-streams spout-out :emitted)
+ :transferred (sum-streams spout-out :transferred)
+ :acked acked
+ :failed (sum-streams spout-out :failed)}
+ {:complete-latency (if (and acked (pos? acked))
+ (div (sum-streams spout-out
+ :completeLatencyTotal)
+ acked)
+ nil)})))})
+
+(defn merge-agg-comp-stats-topo-page-bolt
+ [acc-bolt-stats bolt-stats]
+ {:num-executors (inc (or (:num-executors acc-bolt-stats) 0))
+ :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks
bolt-stats))
+ :emitted (sum-or-0 (:emitted acc-bolt-stats) (:emitted bolt-stats))
+ :transferred (sum-or-0 (:transferred acc-bolt-stats)
+ (:transferred bolt-stats))
+ :capacity (max-or-0 (:capacity acc-bolt-stats) (:capacity bolt-stats))
+ ;; We sum average latency totals here to avoid dividing at each step.
+ ;; Compute the average latencies by dividing the total by the count.
+ :executeLatencyTotal (sum-or-0 (:executeLatencyTotal acc-bolt-stats)
+ (:executeLatencyTotal bolt-stats))
+ :processLatencyTotal (sum-or-0 (:processLatencyTotal acc-bolt-stats)
+ (:processLatencyTotal bolt-stats))
+ :executed (sum-or-0 (:executed acc-bolt-stats) (:executed bolt-stats))
+ :acked (sum-or-0 (:acked acc-bolt-stats) (:acked bolt-stats))
+ :failed (sum-or-0 (:failed acc-bolt-stats) (:failed bolt-stats))})
+
+(defn merge-agg-comp-stats-topo-page-spout
+ [acc-spout-stats spout-stats]
+ {:num-executors (inc (or (:num-executors acc-spout-stats) 0))
+ :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks
spout-stats))
+ :emitted (sum-or-0 (:emitted acc-spout-stats) (:emitted spout-stats))
+ :transferred (sum-or-0 (:transferred acc-spout-stats) (:transferred
spout-stats))
+ ;; We sum average latency totals here to avoid dividing at each step.
+ ;; Compute the average latencies by dividing the total by the count.
+ :completeLatencyTotal (sum-or-0 (:completeLatencyTotal acc-spout-stats)
+ (:completeLatencyTotal spout-stats))
+ :acked (sum-or-0 (:acked acc-spout-stats) (:acked spout-stats))
+ :failed (sum-or-0 (:failed acc-spout-stats) (:failed spout-stats))})
+
+(defn aggregate-count-streams
+ [stats]
+ (->> stats
+ (map-val #(reduce + (vals %)))))
+
+(defn- agg-topo-exec-stats*
+ "A helper function that does the common work to aggregate stats of one
+ executor with the given map for the topology page."
+ [window
+ include-sys?
+ {:keys [workers-set
+ bolt-id->stats
+ spout-id->stats
+ window->emitted
+ window->transferred
+ window->comp-lat-wgt-avg
+ window->acked
+ window->failed] :as acc-stats}
+ {:keys [stats] :as new-data}
+ pre-merge-fn
+ merge-fn
+ comp-key]
+ (let [cid->statk->num (pre-merge-fn new-data window include-sys?)
+ {w->compLatWgtAvg :completeLatencyTotal
+ w->acked :acked}
+ (if (:complete-latencies stats)
+ (swap-map-order
+ (into {}
+ (for [w (keys (:acked stats))]
+ [w (agg-spout-lat-and-count
+ (get (:complete-latencies stats) w)
+ (get (:acked stats) w))])))
+ {:completeLatencyTotal nil
+ :acks (aggregate-count-streams (:acked stats))})
+ handle-sys-components-fn (mk-include-sys-filter include-sys?)]
+ (assoc {:workers-set (conj workers-set
+ [(:host new-data) (:port new-data)])
+ :bolt-id->stats bolt-id->stats
+ :spout-id->stats spout-id->stats
+ :window->emitted (->> (:emitted stats)
+ (map-val handle-sys-components-fn)
+ aggregate-count-streams
+ (merge-with + window->emitted))
+ :window->transferred (->> (:transferred stats)
+ (map-val handle-sys-components-fn)
+ aggregate-count-streams
+ (merge-with + window->transferred))
+ :window->comp-lat-wgt-avg (merge-with +
+ window->comp-lat-wgt-avg
+ w->compLatWgtAvg)
+ :window->acked (if (= :spout (:type stats))
+ (merge-with + window->acked w->acked)
+ window->acked)
+ :window->failed (if (= :spout (:type stats))
+ (->> (:failed stats)
+ aggregate-count-streams
+ (merge-with + window->failed))
+ window->failed)}
+ comp-key (merge-with merge-fn
+ (acc-stats comp-key)
+ cid->statk->num)
+ :type (:type stats))))
+
+(defmulti agg-topo-exec-stats
+ "Combines the aggregate stats of one executor with the given map,
selecting
+ the appropriate window and including system components as specified."
+ (fn dispatch-fn [& args] (:type (last args))))
+
+(defmethod agg-topo-exec-stats :bolt
+ [window include-sys? acc-stats new-data]
+ (agg-topo-exec-stats* window
+ include-sys?
+ acc-stats
+ new-data
+ agg-pre-merge-topo-page-bolt
+ merge-agg-comp-stats-topo-page-bolt
+ :bolt-id->stats))
+
+(defmethod agg-topo-exec-stats :spout
+ [window include-sys? acc-stats new-data]
+ (agg-topo-exec-stats* window
+ include-sys?
+ acc-stats
+ new-data
+ agg-pre-merge-topo-page-spout
+ merge-agg-comp-stats-topo-page-spout
+ :spout-id->stats))
+
+(defmethod agg-topo-exec-stats :default [_ _ acc-stats _] acc-stats)
+
+(defn get-last-error
+ [storm-cluster-state storm-id component-id]
+ (if-let [e (.last-error storm-cluster-state storm-id component-id)]
+ (ErrorInfo. (:error e) (:time-secs e))))
+
+(defn component-type
+ "Returns the component type (either :bolt or :spout) for a given
+ topology and component id. Returns nil if not found."
+ [^StormTopology topology id]
+ (let [bolts (.get_bolts topology)
+ spouts (.get_spouts topology)]
+ (cond
+ (.containsKey bolts id) :bolt
+ (.containsKey spouts id) :spout)))
+
+(defn extract-data-from-hb
+ ([exec->host+port task->component beats include-sys? topology comp-id]
+ (for [[[start end :as executor] [host port]] exec->host+port
+ :let [beat (beats executor)
+ id (task->component start)]
+ :when (and (or (nil? comp-id) (= comp-id id))
+ (or include-sys? (not (system-id? id))))]
+ {:exec-id executor
+ :comp-id id
+ :num-tasks (count (range start (inc end)))
+ :host host
+ :port port
+ :uptime (:uptime beat)
+ :stats (:stats beat)
+ :type (or (:type (:stats beat))
+ (component-type topology id))}))
+ ([exec->host+port task->component beats include-sys? topology]
+ (extract-data-from-hb exec->host+port
+ task->component
+ beats
+ include-sys?
+ topology
+ nil)))
+
+(defn aggregate-topo-stats
+ [window include-sys? data]
+ (let [init-val {:workers-set #{}
+ :bolt-id->stats {}
+ :spout-id->stats {}
+ :window->emitted {}
+ :window->transferred {}
+ :window->comp-lat-wgt-avg {}
+ :window->acked {}
+ :window->failed {}}
+ reducer-fn (partial agg-topo-exec-stats
+ window
+ include-sys?)]
+ (reduce reducer-fn init-val data)))
+
+(defn- compute-weighted-averages-per-window
+ [acc-data wgt-avg-key divisor-key]
+ (into {} (for [[window wgt-avg] (wgt-avg-key acc-data)
+ :let [divisor ((divisor-key acc-data) window)]
+ :when (and divisor (pos? divisor))]
+ [(str window) (div wgt-avg divisor)])))
+
+(defn- post-aggregate-topo-stats
+ [task->component exec->node+port last-err-fn acc-data]
+ {:num-tasks (count task->component)
+ :num-workers (count (:workers-set acc-data))
+ :num-executors (count exec->node+port)
+ :bolt-id->stats
+ (into {} (for [[id m] (:bolt-id->stats acc-data)
+ :let [executed (:executed m)]]
+ [id (-> m
+ (assoc :execute-latency
+ (if (and executed (pos? executed))
+ (div (or (:executeLatencyTotal m) 0)
+ executed)
+ 0)
+ :process-latency
+ (if (and executed (pos? executed))
+ (div (or (:processLatencyTotal m) 0)
+ executed)
+ 0))
+ (dissoc :executeLatencyTotal
+ :processLatencyTotal)
+ (assoc :lastError (last-err-fn id)))]))
+ :spout-id->stats
+ (into {} (for [[id m] (:spout-id->stats acc-data)
+ :let [acked (:acked m)]]
+ [id (-> m
+ (assoc :completeLatency
+ (if (and acked (pos? acked))
+ (div (:completeLatencyTotal m)
+ (:acked m))
+ 0))
+ (dissoc :completeLatencyTotal)
+ (assoc :lastError (last-err-fn id)))]))
+ :window->emitted (map-key str (:window->emitted acc-data))
+ :window->transferred (map-key str (:window->transferred acc-data))
+ :window->complete-latency
+ (compute-weighted-averages-per-window acc-data
+ :window->comp-lat-wgt-avg
+ :window->acked)
+ :window->acked (map-key str (:window->acked acc-data))
+ :window->failed (map-key str (:window->failed acc-data))})
+
+(defn- thriftify-common-agg-stats
+ [^ComponentAggregateStats s
+ {:keys [num-tasks
+ emitted
+ transferred
+ acked
+ failed
+ num-executors] :as statk->num}]
+ (let [cas (CommonAggregateStats.)]
+ (and num-executors (.set_num_executors cas num-executors))
+ (and num-tasks (.set_num_tasks cas num-tasks))
+ (and emitted (.set_emitted cas emitted))
+ (and transferred (.set_transferred cas transferred))
+ (and acked (.set_acked cas acked))
+ (and failed (.set_failed cas failed))
+ (.set_common_stats s cas)))
+
+(defn thriftify-bolt-agg-stats
+ [statk->num]
+ (let [{:keys [lastError
+ execute-latency
+ process-latency
+ executed
+ capacity]} statk->num
+ s (ComponentAggregateStats.)]
+ (.set_type s ComponentType/BOLT)
+ (and lastError (.set_last_error s lastError))
+ (thriftify-common-agg-stats s statk->num)
+ (.set_specific_stats s
+ (SpecificAggregateStats/bolt
+ (let [bas (BoltAggregateStats.)]
+ (and execute-latency (.set_execute_latency_ms bas
execute-latency))
+ (and process-latency (.set_process_latency_ms bas
process-latency))
+ (and executed (.set_executed bas executed))
+ (and capacity (.set_capacity bas capacity))
+ bas)))
+ s))
+
+(defn thriftify-spout-agg-stats
+ [statk->num]
+ (let [{:keys [lastError
+ complete-latency]} statk->num
+ s (ComponentAggregateStats.)]
+ (.set_type s ComponentType/SPOUT)
+ (and lastError (.set_last_error s lastError))
+ (thriftify-common-agg-stats s statk->num)
+ (.set_specific_stats s
+ (SpecificAggregateStats/spout
+ (let [sas (SpoutAggregateStats.)]
+ (and complete-latency (.set_complete_latency_ms sas
complete-latency))
+ sas)))
+ s))
+
+(defn thriftify-topo-page-data
+ [topology-id data]
+ (let [{:keys [num-tasks
+ num-workers
+ num-executors
+ spout-id->stats
+ bolt-id->stats
+ window->emitted
+ window->transferred
+ window->complete-latency
+ window->acked
+ window->failed]} data
+ spout-agg-stats (into {}
+ (for [[id m] spout-id->stats
+ :let [m (assoc m :type :spout)]]
+ [id
+ (thriftify-spout-agg-stats m)]))
+ bolt-agg-stats (into {}
+ (for [[id m] bolt-id->stats
+ :let [m (assoc m :type :bolt)]]
+ [id
+ (thriftify-bolt-agg-stats m)]))
+ topology-stats (doto (TopologyStats.)
+ (.set_window_to_emitted window->emitted)
+ (.set_window_to_transferred window->transferred)
+ (.set_window_to_complete_latencies_ms
+ window->complete-latency)
+ (.set_window_to_acked window->acked)
+ (.set_window_to_failed window->failed))
+ topo-page-info (doto (TopologyPageInfo. topology-id)
+ (.set_num_tasks num-tasks)
+ (.set_num_workers num-workers)
+ (.set_num_executors num-executors)
+ (.set_id_to_spout_agg_stats spout-agg-stats)
+ (.set_id_to_bolt_agg_stats bolt-agg-stats)
+ (.set_topology_stats topology-stats))]
+ topo-page-info))
+
+(defn agg-topo-execs-stats
+ "Aggregate various executor statistics for a topology from the given
+ heartbeats."
+ [topology-id
+ exec->node+port
+ task->component
+ beats
+ topology
+ window
+ include-sys?
+ last-err-fn]
+ (->> ;; This iterates over each executor one time, because of lazy
evaluation.
+ (extract-data-from-hb exec->node+port
+ task->component
+ beats
+ include-sys?
+ topology)
+ (aggregate-topo-stats window include-sys?)
+ (post-aggregate-topo-stats task->component exec->node+port last-err-fn)
+ (thriftify-topo-page-data topology-id)))
+
+(defn- agg-bolt-exec-win-stats
+ "A helper function that aggregates windowed stats from one bolt
executor."
+ [acc-stats new-stats include-sys?]
+ (let [{w->execLatWgtAvg :executeLatencyTotal
+ w->procLatWgtAvg :processLatencyTotal
+ w->executed :executed}
+ (swap-map-order
+ (into {} (for [w (keys (:executed new-stats))]
+ [w (agg-bolt-lat-and-count
+ (get (:execute-latencies new-stats) w)
+ (get (:process-latencies new-stats) w)
+ (get (:executed new-stats) w))])))
+ handle-sys-components-fn (mk-include-sys-filter include-sys?)]
+ {:window->emitted (->> (:emitted new-stats)
+ (map-val handle-sys-components-fn)
+ aggregate-count-streams
+ (merge-with + (:window->emitted acc-stats)))
+ :window->transferred (->> (:transferred new-stats)
+ (map-val handle-sys-components-fn)
+ aggregate-count-streams
+ (merge-with + (:window->transferred
acc-stats)))
+ :window->exec-lat-wgt-avg (merge-with +
+ (:window->exec-lat-wgt-avg
acc-stats)
+ w->execLatWgtAvg)
+ :window->proc-lat-wgt-avg (merge-with +
+ (:window->proc-lat-wgt-avg
acc-stats)
+ w->procLatWgtAvg)
+ :window->executed (merge-with + (:window->executed acc-stats)
w->executed)
+ :window->acked (->> (:acked new-stats)
+ aggregate-count-streams
+ (merge-with + (:window->acked acc-stats)))
+ :window->failed (->> (:failed new-stats)
+ aggregate-count-streams
+ (merge-with + (:window->failed acc-stats)))}))
+
+(defn- agg-spout-exec-win-stats
+ "A helper function that aggregates windowed stats from one spout
executor."
+ [acc-stats new-stats include-sys?]
+ (let [{w->compLatWgtAvg :completeLatencyTotal
+ w->acked :acked}
+ (swap-map-order
+ (into {} (for [w (keys (:acked new-stats))]
+ [w (agg-spout-lat-and-count
+ (get (:complete-latencies new-stats) w)
+ (get (:acked new-stats) w))])))
+ handle-sys-components-fn (mk-include-sys-filter include-sys?)]
+ {:window->emitted (->> (:emitted new-stats)
+ (map-val handle-sys-components-fn)
+ aggregate-count-streams
+ (merge-with + (:window->emitted acc-stats)))
+ :window->transferred (->> (:transferred new-stats)
+ (map-val handle-sys-components-fn)
+ aggregate-count-streams
+ (merge-with + (:window->transferred
acc-stats)))
+ :window->comp-lat-wgt-avg (merge-with +
+ (:window->comp-lat-wgt-avg
acc-stats)
+ w->compLatWgtAvg)
+ :window->acked (->> (:acked new-stats)
+ aggregate-count-streams
+ (merge-with + (:window->acked acc-stats)))
+ :window->failed (->> (:failed new-stats)
+ aggregate-count-streams
+ (merge-with + (:window->failed acc-stats)))}))
+
+(defmulti agg-comp-exec-stats
+ "Combines the aggregate stats of one executor with the given map,
selecting
+ the appropriate window and including system components as specified."
+ (fn dispatch-fn [_ _ init-val _] (:type init-val)))
+
+(defmethod agg-comp-exec-stats :bolt
+ [window include-sys? acc-stats new-data]
+ (assoc (agg-bolt-exec-win-stats acc-stats (:stats new-data) include-sys?)
+ :stats (merge-agg-comp-stats-comp-page-bolt
+ (:stats acc-stats)
+ (agg-pre-merge-comp-page-bolt new-data window
include-sys?))
+ :type :bolt))
+
+(defmethod agg-comp-exec-stats :spout
+ [window include-sys? acc-stats new-data]
+ (assoc (agg-spout-exec-win-stats acc-stats (:stats new-data)
include-sys?)
+ :stats (merge-agg-comp-stats-comp-page-spout
+ (:stats acc-stats)
+ (agg-pre-merge-comp-page-spout new-data window
include-sys?))
+ :type :spout))
+
+(defn- aggregate-comp-stats*
+ [window include-sys? data init-val]
+ (-> (partial agg-comp-exec-stats
+ window
+ include-sys?)
+ (reduce init-val data)))
+
+(defmulti aggregate-comp-stats
+ (fn dispatch-fn [& args] (-> args last first :stats :type)))
--- End diff --
This could result in a null dispatch value if the component's heartbeat
does not contain metrics. We should be using the `:type` directly instead of
accessing it via `:stats`.
> UI Topology & Component Pages have long load times with large,
> highly-connected Topologies
> ------------------------------------------------------------------------------------------
>
> Key: STORM-820
> URL: https://issues.apache.org/jira/browse/STORM-820
> Project: Apache Storm
> Issue Type: Improvement
> Affects Versions: 0.11.0
> Reporter: Derek Dagit
> Assignee: Derek Dagit
>
> In the UI, the Topology Page and the Component Page each make a
> getTopologyInfoWithOpts thrift call to nimbus for executor heartbeat data.
> Metrics from this data are then aggregated in by the UI daemon for display.
> When large topologies, with high-connectedness, are viewed in this way, the
> load times for each page can be minutes long. In addition, heap usage by the
> nimbus JVM can grow substantially as data for each executor, component, &
> stream is serialized to be sent to the UI.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)