http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/stats.clj 
b/storm-core/src/clj/org/apache/storm/stats.clj
new file mode 100644
index 0000000..68b16fd
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/stats.clj
@@ -0,0 +1,1521 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.stats
+  (:import [org.apache.storm.generated Nimbus Nimbus$Processor Nimbus$Iface 
StormTopology ShellComponent
+            NotAliveException AlreadyAliveException InvalidTopologyException 
GlobalStreamId
+            ClusterSummary TopologyInfo TopologySummary ExecutorInfo 
ExecutorSummary ExecutorStats
+            ExecutorSpecificStats SpoutStats BoltStats ErrorInfo
+            SupervisorSummary CommonAggregateStats ComponentAggregateStats
+            ComponentPageInfo ComponentType BoltAggregateStats
+            ExecutorAggregateStats SpecificAggregateStats
+            SpoutAggregateStats TopologyPageInfo TopologyStats])
+  (:import [org.apache.storm.utils Utils])
+  (:import [org.apache.storm.metric.internal MultiCountStatAndMetric 
MultiLatencyStatAndMetric])
+  (:use [org.apache.storm log util])
+  (:use [clojure.math.numeric-tower :only [ceil]]))
+
+(def TEN-MIN-IN-SECONDS (* 10 60))
+
+(def COMMON-FIELDS [:emitted :transferred])
+(defrecord CommonStats [^MultiCountStatAndMetric emitted
+                        ^MultiCountStatAndMetric transferred
+                        rate])
+
+(def BOLT-FIELDS [:acked :failed :process-latencies :executed 
:execute-latencies])
+;;acked and failed count individual tuples
+(defrecord BoltExecutorStats [^CommonStats common
+                              ^MultiCountStatAndMetric acked
+                              ^MultiCountStatAndMetric failed
+                              ^MultiLatencyStatAndMetric process-latencies
+                              ^MultiCountStatAndMetric executed
+                              ^MultiLatencyStatAndMetric execute-latencies])
+
+(def SPOUT-FIELDS [:acked :failed :complete-latencies])
+;;acked and failed count tuple completion
+(defrecord SpoutExecutorStats [^CommonStats common
+                               ^MultiCountStatAndMetric acked
+                               ^MultiCountStatAndMetric failed
+                               ^MultiLatencyStatAndMetric complete-latencies])
+
+(def NUM-STAT-BUCKETS 20)
+
+(defn- mk-common-stats
+  [rate]
+  (CommonStats.
+    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+    rate))
+
+(defn mk-bolt-stats
+  [rate]
+  (BoltExecutorStats.
+    (mk-common-stats rate)
+    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+    (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)
+    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+    (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)))
+
+(defn mk-spout-stats
+  [rate]
+  (SpoutExecutorStats.
+    (mk-common-stats rate)
+    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+    (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)))
+
+(defmacro stats-rate
+  [stats]
+  `(-> ~stats :common :rate))
+
+(defmacro stats-emitted
+  [stats]
+  `(-> ~stats :common :emitted))
+
+(defmacro stats-transferred
+  [stats]
+  `(-> ~stats :common :transferred))
+
+(defmacro stats-executed
+  [stats]
+  `(:executed ~stats))
+
+(defmacro stats-acked
+  [stats]
+  `(:acked ~stats))
+
+(defmacro stats-failed
+  [stats]
+  `(:failed ~stats))
+
+(defmacro stats-execute-latencies
+  [stats]
+  `(:execute-latencies ~stats))
+
+(defmacro stats-process-latencies
+  [stats]
+  `(:process-latencies ~stats))
+
+(defmacro stats-complete-latencies
+  [stats]
+  `(:complete-latencies ~stats))
+
+(defn emitted-tuple!
+  [stats stream]
+  (.incBy ^MultiCountStatAndMetric (stats-emitted stats) ^Object stream ^long 
(stats-rate stats)))
+
+(defn transferred-tuples!
+  [stats stream amt]
+  (.incBy ^MultiCountStatAndMetric (stats-transferred stats) ^Object stream 
^long (* (stats-rate stats) amt)))
+
+(defn bolt-execute-tuple!
+  [^BoltExecutorStats stats component stream latency-ms]
+  (let [key [component stream]
+        ^MultiCountStatAndMetric executed (stats-executed stats)
+        ^MultiLatencyStatAndMetric exec-lat (stats-execute-latencies stats)]
+    (.incBy executed key (stats-rate stats))
+    (.record exec-lat key latency-ms)))
+
+(defn bolt-acked-tuple!
+  [^BoltExecutorStats stats component stream latency-ms]
+  (let [key [component stream]
+        ^MultiCountStatAndMetric acked (stats-acked stats)
+        ^MultiLatencyStatAndMetric process-lat (stats-process-latencies stats)]
+    (.incBy acked key (stats-rate stats))
+    (.record process-lat key latency-ms)))
+
+(defn bolt-failed-tuple!
+  [^BoltExecutorStats stats component stream latency-ms]
+  (let [key [component stream]
+        ^MultiCountStatAndMetric failed (stats-failed stats)]
+    (.incBy failed key (stats-rate stats))))
+
+(defn spout-acked-tuple!
+  [^SpoutExecutorStats stats stream latency-ms]
+  (.incBy ^MultiCountStatAndMetric (stats-acked stats) stream (stats-rate 
stats))
+  (.record ^MultiLatencyStatAndMetric (stats-complete-latencies stats) stream 
latency-ms))
+
+(defn spout-failed-tuple!
+  [^SpoutExecutorStats stats stream latency-ms]
+  (.incBy ^MultiCountStatAndMetric (stats-failed stats) stream (stats-rate 
stats)))
+
+(defn- cleanup-stat! [stat]
+  (.close stat))
+
+(defn- cleanup-common-stats!
+  [^CommonStats stats]
+  (doseq [f COMMON-FIELDS]
+    (cleanup-stat! (f stats))))
+
+(defn cleanup-bolt-stats!
+  [^BoltExecutorStats stats]
+  (cleanup-common-stats! (:common stats))
+  (doseq [f BOLT-FIELDS]
+    (cleanup-stat! (f stats))))
+
+(defn cleanup-spout-stats!
+  [^SpoutExecutorStats stats]
+  (cleanup-common-stats! (:common stats))
+  (doseq [f SPOUT-FIELDS]
+    (cleanup-stat! (f stats))))
+
+(defn- value-stats
+  [stats fields]
+  (into {} (dofor [f fields]
+                  [f (if (instance? MultiCountStatAndMetric (f stats))
+                         (.getTimeCounts ^MultiCountStatAndMetric (f stats))
+                         (.getTimeLatAvg ^MultiLatencyStatAndMetric (f 
stats)))])))
+
+(defn- value-common-stats
+  [^CommonStats stats]
+  (merge
+    (value-stats stats COMMON-FIELDS)
+    {:rate (:rate stats)}))
+
+(defn value-bolt-stats!
+  [^BoltExecutorStats stats]
+  (cleanup-bolt-stats! stats)
+  (merge (value-common-stats (:common stats))
+         (value-stats stats BOLT-FIELDS)
+         {:type :bolt}))
+
+(defn value-spout-stats!
+  [^SpoutExecutorStats stats]
+  (cleanup-spout-stats! stats)
+  (merge (value-common-stats (:common stats))
+         (value-stats stats SPOUT-FIELDS)
+         {:type :spout}))
+
+(defmulti render-stats! class-selector)
+
+(defmethod render-stats! SpoutExecutorStats
+  [stats]
+  (value-spout-stats! stats))
+
+(defmethod render-stats! BoltExecutorStats
+  [stats]
+  (value-bolt-stats! stats))
+
+(defmulti thriftify-specific-stats :type)
+(defmulti clojurify-specific-stats class-selector)
+
+(defn window-set-converter
+  ([stats key-fn first-key-fun]
+    (into {}
+      (for [[k v] stats]
+        ;apply the first-key-fun only to first key.
+        [(first-key-fun k)
+         (into {} (for [[k2 v2] v]
+                    [(key-fn k2) v2]))])))
+  ([stats first-key-fun]
+    (window-set-converter stats identity first-key-fun)))
+
+(defn to-global-stream-id
+  [[component stream]]
+  (GlobalStreamId. component stream))
+
+(defn from-global-stream-id [global-stream-id]
+  [(.get_componentId global-stream-id) (.get_streamId global-stream-id)])
+
+(defmethod clojurify-specific-stats BoltStats [^BoltStats stats]
+  [(window-set-converter (.get_acked stats) from-global-stream-id identity)
+   (window-set-converter (.get_failed stats) from-global-stream-id identity)
+   (window-set-converter (.get_process_ms_avg stats) from-global-stream-id 
identity)
+   (window-set-converter (.get_executed stats) from-global-stream-id identity)
+   (window-set-converter (.get_execute_ms_avg stats) from-global-stream-id 
identity)])
+
+(defmethod clojurify-specific-stats SpoutStats [^SpoutStats stats]
+  [(.get_acked stats)
+   (.get_failed stats)
+   (.get_complete_ms_avg stats)])
+
+
+(defn clojurify-executor-stats
+  [^ExecutorStats stats]
+  (let [ specific-stats (.get_specific stats)
+         is_bolt? (.is_set_bolt specific-stats)
+         specific-stats (if is_bolt? (.get_bolt specific-stats) (.get_spout 
specific-stats))
+         specific-stats (clojurify-specific-stats specific-stats)
+         common-stats (CommonStats. (.get_emitted stats)
+                                    (.get_transferred stats)
+                                    (.get_rate stats))]
+    (if is_bolt?
+      ; worker heart beat does not store the BoltExecutorStats or 
SpoutExecutorStats , instead it stores the result returned by render-stats!
+      ; which flattens the BoltExecutorStats/SpoutExecutorStats by extracting 
values from all atoms and merging all values inside :common to top
+      ;level map we are pretty much doing the same here.
+      (dissoc (merge common-stats {:type :bolt}  (apply ->BoltExecutorStats 
(into [nil] specific-stats))) :common)
+      (dissoc (merge common-stats {:type :spout} (apply ->SpoutExecutorStats 
(into [nil] specific-stats))) :common)
+      )))
+
+(defmethod thriftify-specific-stats :bolt
+  [stats]
+  (ExecutorSpecificStats/bolt
+    (BoltStats.
+      (window-set-converter (:acked stats) to-global-stream-id str)
+      (window-set-converter (:failed stats) to-global-stream-id str)
+      (window-set-converter (:process-latencies stats) to-global-stream-id str)
+      (window-set-converter (:executed stats) to-global-stream-id str)
+      (window-set-converter (:execute-latencies stats) to-global-stream-id 
str))))
+
+(defmethod thriftify-specific-stats :spout
+  [stats]
+  (ExecutorSpecificStats/spout
+    (SpoutStats. (window-set-converter (:acked stats) str)
+      (window-set-converter (:failed stats) str)
+      (window-set-converter (:complete-latencies stats) str))))
+
+(defn thriftify-executor-stats
+  [stats]
+  (let [specific-stats (thriftify-specific-stats stats)
+        rate (:rate stats)]
+    (ExecutorStats. (window-set-converter (:emitted stats) str)
+      (window-set-converter (:transferred stats) str)
+      specific-stats
+      rate)))
+
+(defn valid-number?
+  "Returns true if x is a number that is not NaN or Infinity, false otherwise"
+  [x]
+  (and (number? x)
+       (not (Double/isNaN x))
+       (not (Double/isInfinite x))))
+
+(defn apply-default
+  [f defaulting-fn & args]
+  (apply f (map defaulting-fn args)))
+
+(defn apply-or-0
+  [f & args]
+  (apply apply-default
+         f
+         #(if (valid-number? %) % 0)
+         args))
+
+(defn sum-or-0
+  [& args]
+  (apply apply-or-0 + args))
+
+(defn product-or-0
+  [& args]
+  (apply apply-or-0 * args))
+
+(defn max-or-0
+  [& args]
+  (apply apply-or-0 max args))
+
+(defn- agg-bolt-lat-and-count
+  "Aggregates number executed, process latency, and execute latency across all
+  streams."
+  [idk->exec-avg idk->proc-avg idk->num-executed]
+  (letfn [(weight-avg [[id avg]]
+            (let [num-e (get idk->num-executed id)]
+              (product-or-0 avg num-e)))]
+    {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
+     :processLatencyTotal (sum (map weight-avg idk->proc-avg))
+     :executed (sum (vals idk->num-executed))}))
+
+(defn- agg-spout-lat-and-count
+  "Aggregates number acked and complete latencies across all streams."
+  [sid->comp-avg sid->num-acked]
+  (letfn [(weight-avg [[id avg]]
+            (product-or-0 avg (get sid->num-acked id)))]
+    {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
+     :acked (sum (vals sid->num-acked))}))
+
+(defn add-pairs
+  ([] [0 0])
+  ([[a1 a2] [b1 b2]]
+   [(+ a1 b1) (+ a2 b2)]))
+
+(defn mk-include-sys-fn
+  [include-sys?]
+  (if include-sys?
+    (fn [_] true)
+    (fn [stream] (and (string? stream) (not (Utils/isSystemId stream))))))
+
+(defn mk-include-sys-filter
+  "Returns a function that includes or excludes map entries whose keys are
+  system ids."
+  [include-sys?]
+  (if include-sys?
+    identity
+    (partial filter-key (mk-include-sys-fn false))))
+
+(defn- agg-bolt-streams-lat-and-count
+  "Aggregates number executed and process & execute latencies."
+  [idk->exec-avg idk->proc-avg idk->executed]
+  (letfn [(weight-avg [id avg]
+            (let [num-e (idk->executed id)]
+              (product-or-0 avg num-e)))]
+    (into {}
+      (for [k (keys idk->exec-avg)]
+        [k {:executeLatencyTotal (weight-avg k (get idk->exec-avg k))
+            :processLatencyTotal (weight-avg k (get idk->proc-avg k))
+            :executed (idk->executed k)}]))))
+
+(defn- agg-spout-streams-lat-and-count
+  "Aggregates number acked and complete latencies."
+  [idk->comp-avg idk->acked]
+  (letfn [(weight-avg [id avg]
+            (let [num-e (get idk->acked id)]
+              (product-or-0 avg num-e)))]
+    (into {}
+      (for [k (keys idk->comp-avg)]
+        [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k))
+            :acked (get idk->acked k)}]))))
+
+(defn swap-map-order
+  "For a nested map, rearrange data such that the top-level keys become the
+  nested map's keys and vice versa.
+  Example:
+  {:a {:X :banana, :Y :pear}, :b {:X :apple, :Y :orange}}
+  -> {:Y {:a :pear, :b :orange}, :X {:a :banana, :b :apple}}"
+  [m]
+  (apply merge-with
+         merge
+         (map (fn [[k v]]
+                (into {}
+                      (for [[k2 v2] v]
+                        [k2 {k v2}])))
+              m)))
+
+(defn- compute-agg-capacity
+  "Computes the capacity metric for one executor given its heartbeat data and
+  uptime."
+  [m uptime]
+  (when uptime
+    (->>
+      ;; For each stream, create weighted averages and counts.
+      (merge-with (fn weighted-avg+count-fn
+                    [avg cnt]
+                    [(* avg cnt) cnt])
+                  (get (:execute-latencies m) (str TEN-MIN-IN-SECONDS))
+                  (get (:executed m) (str TEN-MIN-IN-SECONDS)))
+      vals ;; Ignore the stream ids.
+      (reduce add-pairs
+              [0. 0]) ;; Combine weighted averages and counts.
+      ((fn [[weighted-avg cnt]]
+        (div weighted-avg (* 1000 (min uptime TEN-MIN-IN-SECONDS))))))))
+
+(defn agg-pre-merge-comp-page-bolt
+  [{exec-id :exec-id
+    host :host
+    port :port
+    uptime :uptime
+    comp-id :comp-id
+    num-tasks :num-tasks
+    statk->w->sid->num :stats}
+   window
+   include-sys?]
+  (let [str-key (partial map-key str)
+        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
+    {:executor-id exec-id,
+     :host host,
+     :port port,
+     :uptime uptime,
+     :num-executors 1,
+     :num-tasks num-tasks,
+     :capacity (compute-agg-capacity statk->w->sid->num uptime)
+     :cid+sid->input-stats
+     (merge-with
+       merge
+       (swap-map-order
+         {:acked (-> statk->w->sid->num
+                     :acked
+                     str-key
+                     (get window))
+          :failed (-> statk->w->sid->num
+                      :failed
+                      str-key
+                      (get window))})
+       (agg-bolt-streams-lat-and-count (-> statk->w->sid->num
+                                           :execute-latencies
+                                           str-key
+                                           (get window))
+                                       (-> statk->w->sid->num
+                                           :process-latencies
+                                           str-key
+                                           (get window))
+                                       (-> statk->w->sid->num
+                                           :executed
+                                           str-key
+                                           (get window)))),
+     :sid->output-stats
+     (swap-map-order
+       {:emitted (-> statk->w->sid->num
+                     :emitted
+                     str-key
+                     (get window)
+                     handle-sys-components-fn)
+        :transferred (-> statk->w->sid->num
+                         :transferred
+                         str-key
+                         (get window)
+                         handle-sys-components-fn)})}))
+
+(defn agg-pre-merge-comp-page-spout
+  [{exec-id :exec-id
+    host :host
+    port :port
+    uptime :uptime
+    comp-id :comp-id
+    num-tasks :num-tasks
+    statk->w->sid->num :stats}
+   window
+   include-sys?]
+  (let [str-key (partial map-key str)
+        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
+    {:executor-id exec-id,
+     :host host,
+     :port port,
+     :uptime uptime,
+     :num-executors 1,
+     :num-tasks num-tasks,
+     :sid->output-stats
+     (merge-with
+       merge
+       (agg-spout-streams-lat-and-count (-> statk->w->sid->num
+                                            :complete-latencies
+                                            str-key
+                                            (get window))
+                                        (-> statk->w->sid->num
+                                            :acked
+                                            str-key
+                                            (get window)))
+       (swap-map-order
+         {:acked (-> statk->w->sid->num
+                     :acked
+                     str-key
+                     (get window))
+          :failed (-> statk->w->sid->num
+                      :failed
+                      str-key
+                      (get window))
+          :emitted (-> statk->w->sid->num
+                       :emitted
+                       str-key
+                       (get window)
+                       handle-sys-components-fn)
+          :transferred (-> statk->w->sid->num
+                           :transferred
+                           str-key
+                           (get window)
+                           handle-sys-components-fn)}))}))
+
+(defn agg-pre-merge-topo-page-bolt
+  [{comp-id :comp-id
+    num-tasks :num-tasks
+    statk->w->sid->num :stats
+    uptime :uptime}
+   window
+   include-sys?]
+  (let [str-key (partial map-key str)
+        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
+    {comp-id
+     (merge
+       (agg-bolt-lat-and-count (-> statk->w->sid->num
+                                   :execute-latencies
+                                   str-key
+                                   (get window))
+                               (-> statk->w->sid->num
+                                   :process-latencies
+                                   str-key
+                                   (get window))
+                               (-> statk->w->sid->num
+                                   :executed
+                                   str-key
+                                   (get window)))
+       {:num-executors 1
+        :num-tasks num-tasks
+        :emitted (-> statk->w->sid->num
+                     :emitted
+                     str-key
+                     (get window)
+                     handle-sys-components-fn
+                     vals
+                     sum)
+        :transferred (-> statk->w->sid->num
+                         :transferred
+                         str-key
+                         (get window)
+                         handle-sys-components-fn
+                         vals
+                         sum)
+        :capacity (compute-agg-capacity statk->w->sid->num uptime)
+        :acked (-> statk->w->sid->num
+                   :acked
+                   str-key
+                   (get window)
+                   vals
+                   sum)
+        :failed (-> statk->w->sid->num
+                    :failed
+                    str-key
+                    (get window)
+                    vals
+                    sum)})}))
+
+(defn agg-pre-merge-topo-page-spout
+  [{comp-id :comp-id
+    num-tasks :num-tasks
+    statk->w->sid->num :stats}
+   window
+   include-sys?]
+  (let [str-key (partial map-key str)
+        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
+    {comp-id
+     (merge
+       (agg-spout-lat-and-count (-> statk->w->sid->num
+                                    :complete-latencies
+                                    str-key
+                                    (get window))
+                                (-> statk->w->sid->num
+                                    :acked
+                                    str-key
+                                    (get window)))
+       {:num-executors 1
+        :num-tasks num-tasks
+        :emitted (-> statk->w->sid->num
+                     :emitted
+                     str-key
+                     (get window)
+                     handle-sys-components-fn
+                     vals
+                     sum)
+        :transferred (-> statk->w->sid->num
+                         :transferred
+                         str-key
+                         (get window)
+                         handle-sys-components-fn
+                         vals
+                         sum)
+        :failed (-> statk->w->sid->num
+                    :failed
+                    str-key
+                    (get window)
+                    vals
+                    sum)})}))
+
+(defn merge-agg-comp-stats-comp-page-bolt
+  [{acc-in :cid+sid->input-stats
+    acc-out :sid->output-stats
+    :as acc-bolt-stats}
+   {bolt-in :cid+sid->input-stats
+    bolt-out :sid->output-stats
+    :as bolt-stats}]
+  {:num-executors (inc (or (:num-executors acc-bolt-stats) 0)),
+   :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks bolt-stats)),
+   :sid->output-stats (merge-with (partial merge-with sum-or-0)
+                                  acc-out
+                                  bolt-out),
+   :cid+sid->input-stats (merge-with (partial merge-with sum-or-0)
+                                     acc-in
+                                     bolt-in),
+   :executor-stats
+   (let [sum-streams (fn [m k] (->> m vals (map k) (apply sum-or-0)))
+         executed (sum-streams bolt-in :executed)]
+     (conj (:executor-stats acc-bolt-stats)
+           (merge
+             (select-keys bolt-stats
+                          [:executor-id :uptime :host :port :capacity])
+             {:emitted (sum-streams bolt-out :emitted)
+              :transferred (sum-streams bolt-out :transferred)
+              :acked (sum-streams bolt-in :acked)
+              :failed (sum-streams bolt-in :failed)
+              :executed executed}
+             (->>
+               (if (and executed (pos? executed))
+                 [(div (sum-streams bolt-in :executeLatencyTotal) executed)
+                  (div (sum-streams bolt-in :processLatencyTotal) executed)]
+                 [nil nil])
+               (mapcat vector [:execute-latency :process-latency])
+               (apply assoc {})))))})
+
+(defn merge-agg-comp-stats-comp-page-spout
+  [{acc-out :sid->output-stats
+    :as acc-spout-stats}
+   {spout-out :sid->output-stats
+    :as spout-stats}]
+  {:num-executors (inc (or (:num-executors acc-spout-stats) 0)),
+   :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks spout-stats)),
+   :sid->output-stats (merge-with (partial merge-with sum-or-0)
+                                  acc-out
+                                  spout-out),
+   :executor-stats
+   (let [sum-streams (fn [m k] (->> m vals (map k) (apply sum-or-0)))
+         acked (sum-streams spout-out :acked)]
+     (conj (:executor-stats acc-spout-stats)
+           (merge
+             (select-keys spout-stats [:executor-id :uptime :host :port])
+             {:emitted (sum-streams spout-out :emitted)
+              :transferred (sum-streams spout-out :transferred)
+              :acked acked
+              :failed (sum-streams spout-out :failed)}
+             {:complete-latency (if (and acked (pos? acked))
+                                  (div (sum-streams spout-out
+                                                    :completeLatencyTotal)
+                                       acked)
+                                  nil)})))})
+
+(defn merge-agg-comp-stats-topo-page-bolt
+  [acc-bolt-stats bolt-stats]
+  {:num-executors (inc (or (:num-executors acc-bolt-stats) 0))
+   :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks bolt-stats))
+   :emitted (sum-or-0 (:emitted acc-bolt-stats) (:emitted bolt-stats))
+   :transferred (sum-or-0 (:transferred acc-bolt-stats)
+                          (:transferred bolt-stats))
+   :capacity (max-or-0 (:capacity acc-bolt-stats) (:capacity bolt-stats))
+   ;; We sum average latency totals here to avoid dividing at each step.
+   ;; Compute the average latencies by dividing the total by the count.
+   :executeLatencyTotal (sum-or-0 (:executeLatencyTotal acc-bolt-stats)
+                                  (:executeLatencyTotal bolt-stats))
+   :processLatencyTotal (sum-or-0 (:processLatencyTotal acc-bolt-stats)
+                                  (:processLatencyTotal bolt-stats))
+   :executed (sum-or-0 (:executed acc-bolt-stats) (:executed bolt-stats))
+   :acked (sum-or-0 (:acked acc-bolt-stats) (:acked bolt-stats))
+   :failed (sum-or-0 (:failed acc-bolt-stats) (:failed bolt-stats))})
+
+(defn merge-agg-comp-stats-topo-page-spout
+  [acc-spout-stats spout-stats]
+  {:num-executors (inc (or (:num-executors acc-spout-stats) 0))
+   :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks spout-stats))
+   :emitted (sum-or-0 (:emitted acc-spout-stats) (:emitted spout-stats))
+   :transferred (sum-or-0 (:transferred acc-spout-stats) (:transferred 
spout-stats))
+   ;; We sum average latency totals here to avoid dividing at each step.
+   ;; Compute the average latencies by dividing the total by the count.
+   :completeLatencyTotal (sum-or-0 (:completeLatencyTotal acc-spout-stats)
+                            (:completeLatencyTotal spout-stats))
+   :acked (sum-or-0 (:acked acc-spout-stats) (:acked spout-stats))
+   :failed (sum-or-0 (:failed acc-spout-stats) (:failed spout-stats))})
+
+(defn aggregate-count-streams
+  [stats]
+  (->> stats
+       (map-val #(reduce + (vals %)))))
+
+(defn- agg-topo-exec-stats*
+  "A helper function that does the common work to aggregate stats of one
+  executor with the given map for the topology page."
+  [window
+   include-sys?
+   {:keys [workers-set
+           bolt-id->stats
+           spout-id->stats
+           window->emitted
+           window->transferred
+           window->comp-lat-wgt-avg
+           window->acked
+           window->failed] :as acc-stats}
+   {:keys [stats] :as new-data}
+   pre-merge-fn
+   merge-fn
+   comp-key]
+  (let [cid->statk->num (pre-merge-fn new-data window include-sys?)
+        {w->compLatWgtAvg :completeLatencyTotal
+         w->acked :acked}
+          (if (:complete-latencies stats)
+            (swap-map-order
+              (into {}
+                    (for [w (keys (:acked stats))]
+                         [w (agg-spout-lat-and-count
+                              (get (:complete-latencies stats) w)
+                              (get (:acked stats) w))])))
+            {:completeLatencyTotal nil
+             :acks (aggregate-count-streams (:acked stats))})
+        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
+    (assoc {:workers-set (conj workers-set
+                               [(:host new-data) (:port new-data)])
+            :bolt-id->stats bolt-id->stats
+            :spout-id->stats spout-id->stats
+            :window->emitted (->> (:emitted stats)
+                                  (map-val handle-sys-components-fn)
+                                  aggregate-count-streams
+                                  (merge-with + window->emitted))
+            :window->transferred (->> (:transferred stats)
+                                      (map-val handle-sys-components-fn)
+                                      aggregate-count-streams
+                                      (merge-with + window->transferred))
+            :window->comp-lat-wgt-avg (merge-with +
+                                                  window->comp-lat-wgt-avg
+                                                  w->compLatWgtAvg)
+            :window->acked (if (= :spout (:type stats))
+                             (merge-with + window->acked w->acked)
+                             window->acked)
+            :window->failed (if (= :spout (:type stats))
+                              (->> (:failed stats)
+                                   aggregate-count-streams
+                                   (merge-with + window->failed))
+                              window->failed)}
+           comp-key (merge-with merge-fn
+                                (acc-stats comp-key)
+                                cid->statk->num)
+           :type (:type stats))))
+
+(defmulti agg-topo-exec-stats
+  "Combines the aggregate stats of one executor with the given map, selecting
+  the appropriate window and including system components as specified."
+  (fn dispatch-fn [& args] (:type (last args))))
+
+(defmethod agg-topo-exec-stats :bolt
+  [window include-sys? acc-stats new-data]
+  (agg-topo-exec-stats* window
+                        include-sys?
+                        acc-stats
+                        new-data
+                        agg-pre-merge-topo-page-bolt
+                        merge-agg-comp-stats-topo-page-bolt
+                        :bolt-id->stats))
+
+(defmethod agg-topo-exec-stats :spout
+  [window include-sys? acc-stats new-data]
+  (agg-topo-exec-stats* window
+                        include-sys?
+                        acc-stats
+                        new-data
+                        agg-pre-merge-topo-page-spout
+                        merge-agg-comp-stats-topo-page-spout
+                        :spout-id->stats))
+
+(defmethod agg-topo-exec-stats :default [_ _ acc-stats _] acc-stats)
+
+(defn get-last-error
+  [storm-cluster-state storm-id component-id]
+  (if-let [e (.last-error storm-cluster-state storm-id component-id)]
+    (ErrorInfo. (:error e) (:time-secs e))))
+
+(defn component-type
+  "Returns the component type (either :bolt or :spout) for a given
+  topology and component id. Returns nil if not found."
+  [^StormTopology topology id]
+  (let [bolts (.get_bolts topology)
+        spouts (.get_spouts topology)]
+    (cond
+      (Utils/isSystemId id) :bolt
+      (.containsKey bolts id) :bolt
+      (.containsKey spouts id) :spout)))
+
+(defn extract-nodeinfos-from-hb-for-comp
+  ([exec->host+port task->component include-sys? comp-id]
+   (distinct (for [[[start end :as executor] [host port]] exec->host+port
+         :let [id (task->component start)]
+         :when (and (or (nil? comp-id) (= comp-id id))
+                 (or include-sys? (not (Utils/isSystemId id))))]
+     {:host host
+      :port port}))))
+
+(defn extract-data-from-hb
+  ([exec->host+port task->component beats include-sys? topology comp-id]
+   (for [[[start end :as executor] [host port]] exec->host+port
+         :let [beat (beats executor)
+               id (task->component start)]
+         :when (and (or (nil? comp-id) (= comp-id id))
+                    (or include-sys? (not (Utils/isSystemId id))))]
+     {:exec-id executor
+      :comp-id id
+      :num-tasks (count (range start (inc end)))
+      :host host
+      :port port
+      :uptime (:uptime beat)
+      :stats (:stats beat)
+      :type (or (:type (:stats beat))
+                (component-type topology id))}))
+  ([exec->host+port task->component beats include-sys? topology]
+    (extract-data-from-hb exec->host+port
+                          task->component
+                          beats
+                          include-sys?
+                          topology
+                          nil)))
+
+(defn aggregate-topo-stats
+  [window include-sys? data]
+  (let [init-val {:workers-set #{}
+                  :bolt-id->stats {}
+                  :spout-id->stats {}
+                  :window->emitted {}
+                  :window->transferred {}
+                  :window->comp-lat-wgt-avg {}
+                  :window->acked {}
+                  :window->failed {}}
+        reducer-fn (partial agg-topo-exec-stats
+                            window
+                            include-sys?)]
+    (reduce reducer-fn init-val data)))
+
+(defn- compute-weighted-averages-per-window
+  [acc-data wgt-avg-key divisor-key]
+  (into {} (for [[window wgt-avg] (wgt-avg-key acc-data)
+                 :let [divisor ((divisor-key acc-data) window)]
+                 :when (and divisor (pos? divisor))]
+             [(str window) (div wgt-avg divisor)])))
+
+(defn- post-aggregate-topo-stats
+  [task->component exec->node+port last-err-fn acc-data]
+  {:num-tasks (count task->component)
+   :num-workers (count (:workers-set acc-data))
+   :num-executors (count exec->node+port)
+   :bolt-id->stats
+     (into {} (for [[id m] (:bolt-id->stats acc-data)
+                    :let [executed (:executed m)]]
+                     [id (-> m
+                             (assoc :execute-latency
+                                    (if (and executed (pos? executed))
+                                      (div (or (:executeLatencyTotal m) 0)
+                                           executed)
+                                      0)
+                                    :process-latency
+                                    (if (and executed (pos? executed))
+                                      (div (or (:processLatencyTotal m) 0)
+                                           executed)
+                                      0))
+                             (dissoc :executeLatencyTotal
+                                     :processLatencyTotal)
+                             (assoc :lastError (last-err-fn id)))]))
+   :spout-id->stats
+     (into {} (for [[id m] (:spout-id->stats acc-data)
+                    :let [acked (:acked m)]]
+                    [id (-> m
+                            (assoc :complete-latency
+                                   (if (and acked (pos? acked))
+                                     (div (:completeLatencyTotal m)
+                                          (:acked m))
+                                     0))
+                            (dissoc :completeLatencyTotal)
+                            (assoc :lastError (last-err-fn id)))]))
+   :window->emitted (map-key str (:window->emitted acc-data))
+   :window->transferred (map-key str (:window->transferred acc-data))
+   :window->complete-latency
+     (compute-weighted-averages-per-window acc-data
+                                           :window->comp-lat-wgt-avg
+                                           :window->acked)
+   :window->acked (map-key str (:window->acked acc-data))
+   :window->failed (map-key str (:window->failed acc-data))})
+
+(defn- thriftify-common-agg-stats
+  [^ComponentAggregateStats s
+   {:keys [num-tasks
+           emitted
+           transferred
+           acked
+           failed
+           num-executors] :as statk->num}]
+  (let [cas (CommonAggregateStats.)]
+    (and num-executors (.set_num_executors cas num-executors))
+    (and num-tasks (.set_num_tasks cas num-tasks))
+    (and emitted (.set_emitted cas emitted))
+    (and transferred (.set_transferred cas transferred))
+    (and acked (.set_acked cas acked))
+    (and failed (.set_failed cas failed))
+    (.set_common_stats s cas)))
+
+(defn thriftify-bolt-agg-stats
+  [statk->num]
+  (let [{:keys [lastError
+                execute-latency
+                process-latency
+                executed
+                capacity]} statk->num
+        s (ComponentAggregateStats.)]
+    (.set_type s ComponentType/BOLT)
+    (and lastError (.set_last_error s lastError))
+    (thriftify-common-agg-stats s statk->num)
+    (.set_specific_stats s
+      (SpecificAggregateStats/bolt
+        (let [bas (BoltAggregateStats.)]
+          (and execute-latency (.set_execute_latency_ms bas execute-latency))
+          (and process-latency (.set_process_latency_ms bas process-latency))
+          (and executed (.set_executed bas executed))
+          (and capacity (.set_capacity bas capacity))
+          bas)))
+    s))
+
+(defn thriftify-spout-agg-stats
+  [statk->num]
+  (let [{:keys [lastError
+                complete-latency]} statk->num
+        s (ComponentAggregateStats.)]
+    (.set_type s ComponentType/SPOUT)
+    (and lastError (.set_last_error s lastError))
+    (thriftify-common-agg-stats s statk->num)
+    (.set_specific_stats s
+      (SpecificAggregateStats/spout
+        (let [sas (SpoutAggregateStats.)]
+          (and complete-latency (.set_complete_latency_ms sas 
complete-latency))
+          sas)))
+    s))
+
+(defn thriftify-topo-page-data
+  [topology-id data]
+  (let [{:keys [num-tasks
+                num-workers
+                num-executors
+                spout-id->stats
+                bolt-id->stats
+                window->emitted
+                window->transferred
+                window->complete-latency
+                window->acked
+                window->failed]} data
+        spout-agg-stats (into {}
+                              (for [[id m] spout-id->stats
+                                    :let [m (assoc m :type :spout)]]
+                                [id
+                                 (thriftify-spout-agg-stats m)]))
+        bolt-agg-stats (into {}
+                             (for [[id m] bolt-id->stats
+                                   :let [m (assoc m :type :bolt)]]
+                              [id
+                               (thriftify-bolt-agg-stats m)]))
+        topology-stats (doto (TopologyStats.)
+                         (.set_window_to_emitted window->emitted)
+                         (.set_window_to_transferred window->transferred)
+                         (.set_window_to_complete_latencies_ms
+                           window->complete-latency)
+                         (.set_window_to_acked window->acked)
+                         (.set_window_to_failed window->failed))
+      topo-page-info (doto (TopologyPageInfo. topology-id)
+                       (.set_num_tasks num-tasks)
+                       (.set_num_workers num-workers)
+                       (.set_num_executors num-executors)
+                       (.set_id_to_spout_agg_stats spout-agg-stats)
+                       (.set_id_to_bolt_agg_stats bolt-agg-stats)
+                       (.set_topology_stats topology-stats))]
+    topo-page-info))
+
+(defn agg-topo-execs-stats
+  "Aggregate various executor statistics for a topology from the given
+  heartbeats."
+  [topology-id
+   exec->node+port
+   task->component
+   beats
+   topology
+   window
+   include-sys?
+   last-err-fn]
+  (->> ;; This iterates over each executor one time, because of lazy 
evaluation.
+    (extract-data-from-hb exec->node+port
+                          task->component
+                          beats
+                          include-sys?
+                          topology)
+    (aggregate-topo-stats window include-sys?)
+    (post-aggregate-topo-stats task->component exec->node+port last-err-fn)
+    (thriftify-topo-page-data topology-id)))
+
+(defn- agg-bolt-exec-win-stats
+  "A helper function that aggregates windowed stats from one bolt executor."
+  [acc-stats new-stats include-sys?]
+  (let [{w->execLatWgtAvg :executeLatencyTotal
+         w->procLatWgtAvg :processLatencyTotal
+         w->executed :executed}
+          (swap-map-order
+            (into {} (for [w (keys (:executed new-stats))]
+                       [w (agg-bolt-lat-and-count
+                            (get (:execute-latencies new-stats) w)
+                            (get (:process-latencies new-stats) w)
+                            (get (:executed new-stats) w))])))
+        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
+    {:window->emitted (->> (:emitted new-stats)
+                           (map-val handle-sys-components-fn)
+                           aggregate-count-streams
+                           (merge-with + (:window->emitted acc-stats)))
+     :window->transferred (->> (:transferred new-stats)
+                               (map-val handle-sys-components-fn)
+                               aggregate-count-streams
+                               (merge-with + (:window->transferred acc-stats)))
+     :window->exec-lat-wgt-avg (merge-with +
+                                           (:window->exec-lat-wgt-avg 
acc-stats)
+                                           w->execLatWgtAvg)
+     :window->proc-lat-wgt-avg (merge-with +
+                                           (:window->proc-lat-wgt-avg 
acc-stats)
+                                           w->procLatWgtAvg)
+     :window->executed (merge-with + (:window->executed acc-stats) w->executed)
+     :window->acked (->> (:acked new-stats)
+                         aggregate-count-streams
+                         (merge-with + (:window->acked acc-stats)))
+     :window->failed (->> (:failed new-stats)
+                          aggregate-count-streams
+                          (merge-with + (:window->failed acc-stats)))}))
+
+(defn- agg-spout-exec-win-stats
+  "A helper function that aggregates windowed stats from one spout executor."
+  [acc-stats new-stats include-sys?]
+  (let [{w->compLatWgtAvg :completeLatencyTotal
+         w->acked :acked}
+          (swap-map-order
+            (into {} (for [w (keys (:acked new-stats))]
+                       [w (agg-spout-lat-and-count
+                            (get (:complete-latencies new-stats) w)
+                            (get (:acked new-stats) w))])))
+        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
+    {:window->emitted (->> (:emitted new-stats)
+                           (map-val handle-sys-components-fn)
+                           aggregate-count-streams
+                           (merge-with + (:window->emitted acc-stats)))
+     :window->transferred (->> (:transferred new-stats)
+                               (map-val handle-sys-components-fn)
+                               aggregate-count-streams
+                               (merge-with + (:window->transferred acc-stats)))
+     :window->comp-lat-wgt-avg (merge-with +
+                                           (:window->comp-lat-wgt-avg 
acc-stats)
+                                           w->compLatWgtAvg)
+     :window->acked (->> (:acked new-stats)
+                         aggregate-count-streams
+                         (merge-with + (:window->acked acc-stats)))
+     :window->failed (->> (:failed new-stats)
+                          aggregate-count-streams
+                          (merge-with + (:window->failed acc-stats)))}))
+
+(defmulti agg-comp-exec-stats
+  "Combines the aggregate stats of one executor with the given map, selecting
+  the appropriate window and including system components as specified."
+  (fn dispatch-fn [_ _ init-val _] (:type init-val)))
+
+(defmethod agg-comp-exec-stats :bolt
+  [window include-sys? acc-stats new-data]
+  (assoc (agg-bolt-exec-win-stats acc-stats (:stats new-data) include-sys?)
+         :stats (merge-agg-comp-stats-comp-page-bolt
+                  (:stats acc-stats)
+                  (agg-pre-merge-comp-page-bolt new-data window include-sys?))
+         :type :bolt))
+
+(defmethod agg-comp-exec-stats :spout
+  [window include-sys? acc-stats new-data]
+  (assoc (agg-spout-exec-win-stats acc-stats (:stats new-data) include-sys?)
+         :stats (merge-agg-comp-stats-comp-page-spout
+                  (:stats acc-stats)
+                  (agg-pre-merge-comp-page-spout new-data window include-sys?))
+         :type :spout))
+
+(defn- aggregate-comp-stats*
+  [window include-sys? data init-val]
+  (-> (partial agg-comp-exec-stats
+               window
+               include-sys?)
+      (reduce init-val data)))
+
+(defmulti aggregate-comp-stats
+  (fn dispatch-fn [& args] (-> args last first :type)))
+
+(defmethod aggregate-comp-stats :bolt
+  [& args]
+  (let [init-val {:type :bolt
+                  :cid+sid->input-stats {}
+                  :sid->output-stats {}
+                  :executor-stats []
+                  :window->emitted {}
+                  :window->transferred {}
+                  :window->exec-lat-wgt-avg {}
+                  :window->executed {}
+                  :window->proc-lat-wgt-avg {}
+                  :window->acked {}
+                  :window->failed {}}]
+    (apply aggregate-comp-stats* (concat args (list init-val)))))
+
+(defmethod aggregate-comp-stats :spout
+  [& args]
+  (let [init-val {:type :spout
+                  :sid->output-stats {}
+                  :executor-stats []
+                  :window->emitted {}
+                  :window->transferred {}
+                  :window->comp-lat-wgt-avg {}
+                  :window->acked {}
+                  :window->failed {}}]
+    (apply aggregate-comp-stats* (concat args (list init-val)))))
+
+(defmethod aggregate-comp-stats :default [& _] {})
+
+(defmulti post-aggregate-comp-stats
+  (fn [_ _ data] (:type data)))
+
+(defmethod post-aggregate-comp-stats :bolt
+  [task->component
+   exec->host+port
+   {{i-stats :cid+sid->input-stats
+     o-stats :sid->output-stats
+     num-tasks :num-tasks
+     num-executors :num-executors} :stats
+    comp-type :type :as acc-data}]
+  {:type comp-type
+   :num-tasks num-tasks
+   :num-executors num-executors
+   :cid+sid->input-stats
+   (->> i-stats
+        (map-val (fn [m]
+                     (let [executed (:executed m)
+                           lats (if (and executed (pos? executed))
+                                  {:execute-latency
+                                   (div (or (:executeLatencyTotal m) 0)
+                                        executed)
+                                   :process-latency
+                                   (div (or (:processLatencyTotal m) 0)
+                                        executed)}
+                                  {:execute-latency 0
+                                   :process-latency 0})]
+                       (-> m (merge lats) (dissoc :executeLatencyTotal
+                                                  :processLatencyTotal))))))
+   :sid->output-stats o-stats
+   :executor-stats (:executor-stats (:stats acc-data))
+   :window->emitted (map-key str (:window->emitted acc-data))
+   :window->transferred (map-key str (:window->transferred acc-data))
+   :window->execute-latency
+     (compute-weighted-averages-per-window acc-data
+                                           :window->exec-lat-wgt-avg
+                                           :window->executed)
+   :window->executed (map-key str (:window->executed acc-data))
+   :window->process-latency
+     (compute-weighted-averages-per-window acc-data
+                                           :window->proc-lat-wgt-avg
+                                           :window->executed)
+   :window->acked (map-key str (:window->acked acc-data))
+   :window->failed (map-key str (:window->failed acc-data))})
+
+(defmethod post-aggregate-comp-stats :spout
+  [task->component
+   exec->host+port
+   {{o-stats :sid->output-stats
+     num-tasks :num-tasks
+     num-executors :num-executors} :stats
+    comp-type :type :as acc-data}]
+  {:type comp-type
+   :num-tasks num-tasks
+   :num-executors num-executors
+   :sid->output-stats
+   (->> o-stats
+        (map-val (fn [m]
+                     (let [acked (:acked m)
+                           lat (if (and acked (pos? acked))
+                                 {:complete-latency
+                                  (div (or (:completeLatencyTotal m) 0) acked)}
+                                 {:complete-latency 0})]
+                       (-> m (merge lat) (dissoc :completeLatencyTotal))))))
+   :executor-stats (:executor-stats (:stats acc-data))
+   :window->emitted (map-key str (:window->emitted acc-data))
+   :window->transferred (map-key str (:window->transferred acc-data))
+   :window->complete-latency
+     (compute-weighted-averages-per-window acc-data
+                                           :window->comp-lat-wgt-avg
+                                           :window->acked)
+   :window->acked (map-key str (:window->acked acc-data))
+   :window->failed (map-key str (:window->failed acc-data))})
+
+(defmethod post-aggregate-comp-stats :default [& _] {})
+
+(defn thriftify-exec-agg-stats
+  [comp-id comp-type {:keys [executor-id host port uptime] :as stats}]
+  (doto (ExecutorAggregateStats.)
+    (.set_exec_summary (ExecutorSummary. (apply #(ExecutorInfo. %1 %2)
+                                                executor-id)
+                                         comp-id
+                                         host
+                                         port
+                                         (or uptime 0)))
+    (.set_stats ((condp = comp-type
+                   :bolt thriftify-bolt-agg-stats
+                   :spout thriftify-spout-agg-stats) stats))))
+
+(defn- thriftify-bolt-input-stats
+  [cid+sid->input-stats]
+  (into {} (for [[cid+sid input-stats] cid+sid->input-stats]
+             [(to-global-stream-id cid+sid)
+              (thriftify-bolt-agg-stats input-stats)])))
+
+(defn- thriftify-bolt-output-stats
+  [sid->output-stats]
+  (map-val thriftify-bolt-agg-stats sid->output-stats))
+
+(defn- thriftify-spout-output-stats
+  [sid->output-stats]
+  (map-val thriftify-spout-agg-stats sid->output-stats))
+
+(defn thriftify-comp-page-data
+  [topo-id topology comp-id data]
+  (let [w->stats (swap-map-order
+                   (merge
+                     {:emitted (:window->emitted data)
+                      :transferred (:window->transferred data)
+                      :acked (:window->acked data)
+                      :failed (:window->failed data)}
+                     (condp = (:type data)
+                       :bolt {:execute-latency (:window->execute-latency data)
+                              :process-latency (:window->process-latency data)
+                              :executed (:window->executed data)}
+                       :spout {:complete-latency
+                               (:window->complete-latency data)}
+                       {}))) ; default
+        [compType exec-stats w->stats gsid->input-stats sid->output-stats]
+          (condp = (component-type topology comp-id)
+            :bolt [ComponentType/BOLT
+                   (->
+                     (partial thriftify-exec-agg-stats comp-id :bolt)
+                     (map (:executor-stats data)))
+                   (map-val thriftify-bolt-agg-stats w->stats)
+                   (thriftify-bolt-input-stats (:cid+sid->input-stats data))
+                   (thriftify-bolt-output-stats (:sid->output-stats data))]
+            :spout [ComponentType/SPOUT
+                    (->
+                      (partial thriftify-exec-agg-stats comp-id :spout)
+                      (map (:executor-stats data)))
+                    (map-val thriftify-spout-agg-stats w->stats)
+                    nil ;; spouts do not have input stats
+                    (thriftify-spout-output-stats (:sid->output-stats data))]),
+        num-executors (:num-executors data)
+        num-tasks (:num-tasks data)
+        ret (doto (ComponentPageInfo. comp-id compType)
+              (.set_topology_id topo-id)
+              (.set_topology_name nil)
+              (.set_window_to_stats w->stats)
+              (.set_sid_to_output_stats sid->output-stats)
+              (.set_exec_stats exec-stats))]
+    (and num-executors (.set_num_executors ret num-executors))
+    (and num-tasks (.set_num_tasks ret num-tasks))
+    (and gsid->input-stats
+         (.set_gsid_to_input_stats ret gsid->input-stats))
+    ret))
+
+(defn agg-comp-execs-stats
+  "Aggregate various executor statistics for a component from the given
+  heartbeats."
+  [exec->host+port
+   task->component
+   beats
+   window
+   include-sys?
+   topology-id
+   topology
+   component-id]
+  (->> ;; This iterates over each executor one time, because of lazy 
evaluation.
+    (extract-data-from-hb exec->host+port
+                          task->component
+                          beats
+                          include-sys?
+                          topology
+                          component-id)
+    (aggregate-comp-stats window include-sys?)
+    (post-aggregate-comp-stats task->component exec->host+port)
+    (thriftify-comp-page-data topology-id topology component-id)))
+
+(defn expand-averages
+  [avg counts]
+  (let [avg (clojurify-structure avg)
+        counts (clojurify-structure counts)]
+    (into {}
+          (for [[slice streams] counts]
+            [slice
+             (into {}
+                   (for [[stream c] streams]
+                     [stream
+                      [(* c (get-in avg [slice stream]))
+                       c]]
+                     ))]))))
+
+(defn expand-averages-seq
+  [average-seq counts-seq]
+  (->> (map vector average-seq counts-seq)
+       (map #(apply expand-averages %))
+       (apply merge-with (fn [s1 s2] (merge-with add-pairs s1 s2)))))
+
+(defn- val-avg
+  [[t c]]
+  (if (= c 0) 0
+    (double (/ t c))))
+
+(defn aggregate-averages
+  [average-seq counts-seq]
+  (->> (expand-averages-seq average-seq counts-seq)
+       (map-val
+         (fn [s]
+           (map-val val-avg s)))))
+
+(defn aggregate-avg-streams
+  [avg counts]
+  (let [expanded (expand-averages avg counts)]
+    (->> expanded
+         (map-val #(reduce add-pairs (vals %)))
+         (map-val val-avg))))
+
+(defn pre-process
+  [stream-summary include-sys?]
+  (let [filter-fn (mk-include-sys-fn include-sys?)
+        emitted (:emitted stream-summary)
+        emitted (into {} (for [[window stat] emitted]
+                           {window (filter-key filter-fn stat)}))
+        transferred (:transferred stream-summary)
+        transferred (into {} (for [[window stat] transferred]
+                               {window (filter-key filter-fn stat)}))
+        stream-summary (-> stream-summary (dissoc :emitted) (assoc :emitted 
emitted))
+        stream-summary (-> stream-summary (dissoc :transferred) (assoc 
:transferred transferred))]
+    stream-summary))
+
+(defn aggregate-counts
+  [counts-seq]
+  (->> counts-seq
+       (map clojurify-structure)
+       (apply merge-with
+              (fn [s1 s2]
+                (merge-with + s1 s2)))))
+
+(defn aggregate-common-stats
+  [stats-seq]
+  {:emitted (aggregate-counts (map #(.get_emitted ^ExecutorStats %) stats-seq))
+   :transferred (aggregate-counts (map #(.get_transferred ^ExecutorStats %) 
stats-seq))})
+
+(defn aggregate-bolt-stats
+  [stats-seq include-sys?]
+  (let [stats-seq (collectify stats-seq)]
+    (merge (pre-process (aggregate-common-stats stats-seq) include-sys?)
+           {:acked
+            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt 
get_acked)
+                                   stats-seq))
+            :failed
+            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt 
get_failed)
+                                   stats-seq))
+            :executed
+            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt 
get_executed)
+                                   stats-seq))
+            :process-latencies
+            (aggregate-averages (map #(.. ^ExecutorStats % get_specific 
get_bolt get_process_ms_avg)
+                                     stats-seq)
+                                (map #(.. ^ExecutorStats % get_specific 
get_bolt get_acked)
+                                     stats-seq))
+            :execute-latencies
+            (aggregate-averages (map #(.. ^ExecutorStats % get_specific 
get_bolt get_execute_ms_avg)
+                                     stats-seq)
+                                (map #(.. ^ExecutorStats % get_specific 
get_bolt get_executed)
+                                     stats-seq))})))
+
+(defn aggregate-spout-stats
+  [stats-seq include-sys?]
+  (let [stats-seq (collectify stats-seq)]
+    (merge (pre-process (aggregate-common-stats stats-seq) include-sys?)
+           {:acked
+            (aggregate-counts (map #(.. ^ExecutorStats % get_specific 
get_spout get_acked)
+                                   stats-seq))
+            :failed
+            (aggregate-counts (map #(.. ^ExecutorStats % get_specific 
get_spout get_failed)
+                                   stats-seq))
+            :complete-latencies
+            (aggregate-averages (map #(.. ^ExecutorStats % get_specific 
get_spout get_complete_ms_avg)
+                                     stats-seq)
+                                (map #(.. ^ExecutorStats % get_specific 
get_spout get_acked)
+                                     stats-seq))})))
+
+(defn get-filled-stats
+  [summs]
+  (->> summs
+       (map #(.get_stats ^ExecutorSummary %))
+       (filter not-nil?)))
+
+(defn aggregate-spout-streams
+  [stats]
+  {:acked (aggregate-count-streams (:acked stats))
+   :failed (aggregate-count-streams (:failed stats))
+   :emitted (aggregate-count-streams (:emitted stats))
+   :transferred (aggregate-count-streams (:transferred stats))
+   :complete-latencies (aggregate-avg-streams (:complete-latencies stats)
+                                              (:acked stats))})
+
+(defn spout-streams-stats
+  [summs include-sys?]
+  (let [stats-seq (get-filled-stats summs)]
+    (aggregate-spout-streams
+      (aggregate-spout-stats
+        stats-seq include-sys?))))
+
+(defn aggregate-bolt-streams
+  [stats]
+  {:acked (aggregate-count-streams (:acked stats))
+   :failed (aggregate-count-streams (:failed stats))
+   :emitted (aggregate-count-streams (:emitted stats))
+   :transferred (aggregate-count-streams (:transferred stats))
+   :process-latencies (aggregate-avg-streams (:process-latencies stats)
+                                             (:acked stats))
+   :executed (aggregate-count-streams (:executed stats))
+   :execute-latencies (aggregate-avg-streams (:execute-latencies stats)
+                                             (:executed stats))})
+
+(defn compute-executor-capacity
+  [^ExecutorSummary e]
+  (let [stats (.get_stats e)
+        stats (if stats
+                (-> stats
+                    (aggregate-bolt-stats true)
+                    (aggregate-bolt-streams)
+                    swap-map-order
+                    (get (str TEN-MIN-IN-SECONDS))))
+        uptime (nil-to-zero (.get_uptime_secs e))
+        window (if (< uptime TEN-MIN-IN-SECONDS) uptime TEN-MIN-IN-SECONDS)
+        executed (-> stats :executed nil-to-zero)
+        latency (-> stats :execute-latencies nil-to-zero)]
+    (if (> window 0)
+      (div (* executed latency) (* 1000 window)))))
+
+(defn bolt-streams-stats
+  [summs include-sys?]
+  (let [stats-seq (get-filled-stats summs)]
+    (aggregate-bolt-streams
+      (aggregate-bolt-stats
+        stats-seq include-sys?))))
+
+(defn total-aggregate-stats
+  [spout-summs bolt-summs include-sys?]
+  (let [spout-stats (get-filled-stats spout-summs)
+        bolt-stats (get-filled-stats bolt-summs)
+        agg-spout-stats (-> spout-stats
+                            (aggregate-spout-stats include-sys?)
+                            aggregate-spout-streams)
+        agg-bolt-stats (-> bolt-stats
+                           (aggregate-bolt-stats include-sys?)
+                           aggregate-bolt-streams)]
+    (merge-with
+      (fn [s1 s2]
+        (merge-with + s1 s2))
+      (select-keys
+        agg-bolt-stats
+        ;; Include only keys that will be used.  We want to count acked and
+        ;; failed only for the "tuple trees," so we do not include those keys
+        ;; from the bolt executors.
+        [:emitted :transferred])
+      agg-spout-stats)))
+
+(defn error-subset
+  [error-str]
+  (apply str (take 200 error-str)))
+
+(defn most-recent-error
+  [errors-list]
+  (let [error (->> errors-list
+                   (sort-by #(.get_error_time_secs ^ErrorInfo %))
+                   reverse
+                   first)]
+    (if error
+      (error-subset (.get_error ^ErrorInfo error))
+      "")))
+
+(defn float-str [n]
+  (if n
+    (format "%.3f" (float n))
+    "0"))
+
+(defn compute-bolt-capacity
+  [executors]
+  (->> executors
+       (map compute-executor-capacity)
+       (map nil-to-zero)
+       (apply max)))

Reply via email to