Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r31266476
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -373,4 +362,1222 @@
         (ExecutorStats. (window-set-converter (:emitted stats) str)
           (window-set-converter (:transferred stats) str)
           specific-stats
    -      rate)))
    \ No newline at end of file
    +      rate)))
    +
    +(defn- agg-bolt-lat-and-count
    +  "Aggregates number executed, process latency, and execute latency across 
all
    +  streams."
    +  [idk->exec-avg idk->proc-avg idk->num-executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->num-executed]))}
    +  (letfn [(weight-avg [[id avg]] (let [num-e (get idk->num-executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
    +     :processLatencyTotal (sum (map weight-avg idk->proc-avg))
    +     :executed (sum (vals idk->num-executed))}))
    +
    +(defn- agg-spout-lat-and-count
    +  "Aggregates number acked and complete latencies across all streams."
    +  [sid->comp-avg sid->num-acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [sid->comp-avg
    +                       sid->num-acked]))}
    +  (letfn [(weight-avg [[id avg]] (* avg (get sid->num-acked id)))]
    +    {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
    +     :acked (sum (vals sid->num-acked))}))
    +
    +(defn add-pairs
    +  ([] [0 0])
    +  ([[a1 a2] [b1 b2]]
    +   [(+ a1 b1) (+ a2 b2)]))
    +
    +(defn mk-include-sys-fn
    +  [include-sys?]
    +  (if include-sys?
    +    (fn [_] true)
    +    (fn [stream] (and (string? stream) (not (system-id? stream))))))
    +
    +(defn mk-include-sys-filter
    +  "Returns a function that includes or excludes map entries whose keys are
    +  system ids."
    +  [include-sys?]
    +  (if include-sys?
    +    identity
    +    (partial filter-key (mk-include-sys-fn false))))
    +
    +(defn- agg-bolt-streams-lat-and-count
    +  "Aggregates number executed and process & execute latencies."
    +  [idk->exec-avg idk->proc-avg idk->executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->executed]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (idk->executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->exec-avg)]
    +        [k {:executeLatencyTotal (weight-avg k (idk->exec-avg k))
    +            :processLatencyTotal (weight-avg k (idk->proc-avg k))
    +            :executed (idk->executed k)}]))))
    +
    +(defn- agg-spout-streams-lat-and-count
    +  "Aggregates number acked and complete latencies."
    +  [idk->comp-avg idk->acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->comp-avg
    +                       idk->acked]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (get idk->acked id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->comp-avg)]
    +        [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k))
    +            :acked (get idk->acked k)}]))))
    +
    +(defn swap-map-order
    +  "For a nested map, rearrange data such that the top-level keys become the
    +  nested map's keys and vice versa.
    +  Example:
    +  {:a {:X :banana, :Y :pear}, :b {:X :apple, :Y :orange}}
    +  -> {:Y {:a :pear, :b :orange}, :X {:a :banana, :b :apple}}"
    +  [m]
    +  (apply merge-with
    +         merge
    +         (map (fn [[k v]]
    +                (into {}
    +                      (for [[k2 v2] v]
    +                        [k2 {k v2}])))
    +              m)))
    +
    +(defn- compute-agg-capacity
    +  "Computes the capacity metric for one executor given its heartbeat data 
and
    +  uptime."
    +  [m uptime]
    +  (when uptime
    +    (->>
    +      ;; For each stream, create weighted averages and counts.
    +      (merge-with (fn weighted-avg+count-fn
    +                    [avg cnt]
    +                    [(* avg cnt) cnt])
    +                  (get (:execute-latencies m) (str TEN-MIN-IN-SECONDS))
    +                  (get (:executed m) (str TEN-MIN-IN-SECONDS)))
    +      vals ;; Ignore the stream ids.
    +      (reduce add-pairs
    +              [0. 0]) ;; Combine weighted averages and counts.
    +      ((fn [[weighted-avg cnt]]
    +        (div weighted-avg (* 1000 (min uptime TEN-MIN-IN-SECONDS))))))))
    +
    +(defn agg-pre-merge-comp-page-bolt
    +  [{exec-id :exec-id
    +    host :host
    +    port :port
    +    uptime :uptime
    +    comp-id :comp-id
    +    num-tasks :num-tasks
    +    statk->w->sid->num :stats}
    +   window
    +   include-sys?]
    +  (let [str-key (partial map-key str)
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    {:executor-id exec-id,
    +     :host host,
    +     :port port,
    +     :uptime uptime,
    +     :num-executors 1,
    +     :num-tasks num-tasks,
    +     :capacity (compute-agg-capacity statk->w->sid->num uptime)
    +     :cid+sid->input-stats
    +     (merge-with
    +       merge
    +       (swap-map-order
    +         {:acked (-> statk->w->sid->num
    +                     :acked
    +                     str-key
    +                     (get window))
    +          :failed (-> statk->w->sid->num
    +                      :failed
    +                      str-key
    +                      (get window))})
    +       (agg-bolt-streams-lat-and-count (-> statk->w->sid->num
    +                                           :execute-latencies
    +                                           str-key
    +                                           (get window))
    +                                       (-> statk->w->sid->num
    +                                           :process-latencies
    +                                           str-key
    +                                           (get window))
    +                                       (-> statk->w->sid->num
    +                                           :executed
    +                                           str-key
    +                                           (get window)))),
    +     :sid->output-stats
    +     (swap-map-order
    +       {:emitted (-> statk->w->sid->num
    +                     :emitted
    +                     str-key
    +                     (get window)
    +                     handle-sys-components-fn)
    +        :transferred (-> statk->w->sid->num
    +                         :transferred
    +                         str-key
    +                         (get window)
    +                         handle-sys-components-fn)})}))
    +
    +(defn agg-pre-merge-comp-page-spout
    +  [{exec-id :exec-id
    +    host :host
    +    port :port
    +    uptime :uptime
    +    comp-id :comp-id
    +    num-tasks :num-tasks
    +    statk->w->sid->num :stats}
    +   window
    +   include-sys?]
    +  (let [str-key (partial map-key str)
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    {:executor-id exec-id,
    +     :host host,
    +     :port port,
    +     :uptime uptime,
    +     :num-executors 1,
    +     :num-tasks num-tasks,
    +     :sid->output-stats
    +     (merge-with
    +       merge
    +       (agg-spout-streams-lat-and-count (-> statk->w->sid->num
    +                                            :complete-latencies
    +                                            str-key
    +                                            (get window))
    +                                        (-> statk->w->sid->num
    +                                            :acked
    +                                            str-key
    +                                            (get window)))
    +       (swap-map-order
    +         {:acked (-> statk->w->sid->num
    +                     :acked
    +                     str-key
    +                     (get window))
    +          :failed (-> statk->w->sid->num
    +                      :failed
    +                      str-key
    +                      (get window))
    +          :emitted (-> statk->w->sid->num
    +                       :emitted
    +                       str-key
    +                       (get window)
    +                       handle-sys-components-fn)
    +          :transferred (-> statk->w->sid->num
    +                           :transferred
    +                           str-key
    +                           (get window)
    +                           handle-sys-components-fn)}))}))
    +
    +(defn agg-pre-merge-topo-page-bolt
    +  [{comp-id :comp-id
    +    num-tasks :num-tasks
    +    statk->w->sid->num :stats
    +    uptime :uptime}
    +   window
    +   include-sys?]
    +  (let [str-key (partial map-key str)
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    {comp-id
    +     (merge
    +       (agg-bolt-lat-and-count (-> statk->w->sid->num
    +                                   :execute-latencies
    +                                   str-key
    +                                   (get window))
    +                               (-> statk->w->sid->num
    +                                   :process-latencies
    +                                   str-key
    +                                   (get window))
    +                               (-> statk->w->sid->num
    +                                   :executed
    +                                   str-key
    +                                   (get window)))
    +       {:num-executors 1
    +        :num-tasks num-tasks
    +        :emitted (-> statk->w->sid->num
    +                     :emitted
    +                     str-key
    +                     (get window)
    +                     handle-sys-components-fn
    +                     vals
    +                     sum)
    +        :transferred (-> statk->w->sid->num
    +                         :transferred
    +                         str-key
    +                         (get window)
    +                         handle-sys-components-fn
    +                         vals
    +                         sum)
    +        :capacity (compute-agg-capacity statk->w->sid->num uptime)
    +        :acked (-> statk->w->sid->num
    +                   :acked
    +                   str-key
    +                   (get window)
    +                   vals
    +                   sum)
    +        :failed (-> statk->w->sid->num
    +                    :failed
    +                    str-key
    +                    (get window)
    +                    vals
    +                    sum)})}))
    +
    +(defn agg-pre-merge-topo-page-spout
    +  [{comp-id :comp-id
    +    num-tasks :num-tasks
    +    statk->w->sid->num :stats}
    +   window
    +   include-sys?]
    +  (let [str-key (partial map-key str)
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    {comp-id
    +     (merge
    +       (agg-spout-lat-and-count (-> statk->w->sid->num
    +                                    :complete-latencies
    +                                    str-key
    +                                    (get window))
    +                                (-> statk->w->sid->num
    +                                    :acked
    +                                    str-key
    +                                    (get window)))
    +       {:num-executors 1
    +        :num-tasks num-tasks
    +        :emitted (-> statk->w->sid->num
    +                     :emitted
    +                     str-key
    +                     (get window)
    +                     handle-sys-components-fn
    +                     vals
    +                     sum)
    +        :transferred (-> statk->w->sid->num
    +                         :transferred
    +                         str-key
    +                         (get window)
    +                         handle-sys-components-fn
    +                         vals
    +                         sum)
    +        :failed (-> statk->w->sid->num
    +                    :failed
    +                    str-key
    +                    (get window)
    +                    vals
    +                    sum)})}))
    +
    +(defn apply-default
    +  [f defaulting-fn & args]
    +  (apply f (map defaulting-fn args)))
    +
    +(defn apply-or-0
    +  [f & args]
    +  (apply apply-default f #(or % 0) args))
    +
    +(defn sum-or-0
    +  [& args]
    +  (apply apply-or-0 + args))
    +
    +(defn max-or-0
    +  [& args]
    +  (apply apply-or-0 max args))
    +
    +(defn merge-agg-comp-stats-comp-page-bolt
    +  [{acc-in :cid+sid->input-stats
    +    acc-out :sid->output-stats
    +    :as acc-bolt-stats}
    +   {bolt-in :cid+sid->input-stats
    +    bolt-out :sid->output-stats
    +    :as bolt-stats}]
    +  {:num-executors (inc (or (:num-executors acc-bolt-stats) 0)),
    +   :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks 
bolt-stats)),
    +   :sid->output-stats (merge-with (partial merge-with sum-or-0)
    +                                  acc-out
    +                                  bolt-out),
    +   :cid+sid->input-stats (merge-with (partial merge-with sum-or-0)
    +                                     acc-in
    +                                     bolt-in),
    +   :executor-stats
    +   (let [sum-streams (fn [m k] (->> m vals (map k) (apply sum-or-0)))
    +         executed (sum-streams bolt-in :executed)]
    +     (conj (:executor-stats acc-bolt-stats)
    +           (merge
    +             (select-keys bolt-stats
    +                          [:executor-id :uptime :host :port :capacity])
    +             {:emitted (sum-streams bolt-out :emitted)
    +              :transferred (sum-streams bolt-out :transferred)
    +              :acked (sum-streams bolt-in :acked)
    +              :failed (sum-streams bolt-in :failed)
    +              :executed executed}
    +             (->>
    +               (if (and executed (pos? executed))
    +                 [(div (sum-streams bolt-in :executeLatencyTotal) executed)
    +                  (div (sum-streams bolt-in :processLatencyTotal) 
executed)]
    +                 [nil nil])
    +               (mapcat vector [:execute-latency :process-latency])
    +               (apply assoc {})))))})
    +
    +(defn merge-agg-comp-stats-comp-page-spout
    +  [{acc-out :sid->output-stats
    +    :as acc-spout-stats}
    +   {spout-out :sid->output-stats
    +    :as spout-stats}]
    +  {:num-executors (inc (or (:num-executors acc-spout-stats) 0)),
    +   :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks 
spout-stats)),
    +   :sid->output-stats (merge-with (partial merge-with sum-or-0)
    +                                  acc-out
    +                                  spout-out),
    +   :executor-stats
    +   (let [sum-streams (fn [m k] (->> m vals (map k) (apply sum-or-0)))
    +         acked (sum-streams spout-out :acked)]
    +     (conj (:executor-stats acc-spout-stats)
    +           (merge
    +             (select-keys spout-stats [:executor-id :uptime :host :port])
    +             {:emitted (sum-streams spout-out :emitted)
    +              :transferred (sum-streams spout-out :transferred)
    +              :acked acked
    +              :failed (sum-streams spout-out :failed)}
    +             {:complete-latency (if (and acked (pos? acked))
    +                                  (div (sum-streams spout-out
    +                                                    :completeLatencyTotal)
    +                                       acked)
    +                                  nil)})))})
    +
    +(defn merge-agg-comp-stats-topo-page-bolt
    +  [acc-bolt-stats bolt-stats]
    +  {:num-executors (inc (or (:num-executors acc-bolt-stats) 0))
    +   :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks 
bolt-stats))
    +   :emitted (sum-or-0 (:emitted acc-bolt-stats) (:emitted bolt-stats))
    +   :transferred (sum-or-0 (:transferred acc-bolt-stats)
    +                          (:transferred bolt-stats))
    +   :capacity (max-or-0 (:capacity acc-bolt-stats) (:capacity bolt-stats))
    +   ;; We sum average latency totals here to avoid dividing at each step.
    +   ;; Compute the average latencies by dividing the total by the count.
    +   :executeLatencyTotal (sum-or-0 (:executeLatencyTotal acc-bolt-stats)
    +                                  (:executeLatencyTotal bolt-stats))
    +   :processLatencyTotal (sum-or-0 (:processLatencyTotal acc-bolt-stats)
    +                                  (:processLatencyTotal bolt-stats))
    +   :executed (sum-or-0 (:executed acc-bolt-stats) (:executed bolt-stats))
    +   :acked (sum-or-0 (:acked acc-bolt-stats) (:acked bolt-stats))
    +   :failed (sum-or-0 (:failed acc-bolt-stats) (:failed bolt-stats))})
    +
    +(defn merge-agg-comp-stats-topo-page-spout
    +  [acc-spout-stats spout-stats]
    +  {:num-executors (inc (or (:num-executors acc-spout-stats) 0))
    +   :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks 
spout-stats))
    +   :emitted (sum-or-0 (:emitted acc-spout-stats) (:emitted spout-stats))
    +   :transferred (sum-or-0 (:transferred acc-spout-stats) (:transferred 
spout-stats))
    +   ;; We sum average latency totals here to avoid dividing at each step.
    +   ;; Compute the average latencies by dividing the total by the count.
    +   :completeLatencyTotal (sum-or-0 (:completeLatencyTotal acc-spout-stats)
    +                            (:completeLatencyTotal spout-stats))
    +   :acked (sum-or-0 (:acked acc-spout-stats) (:acked spout-stats))
    +   :failed (sum-or-0 (:failed acc-spout-stats) (:failed spout-stats))})
    +
    +(defn aggregate-count-streams
    +  [stats]
    +  (->> stats
    +       (map-val #(reduce + (vals %)))))
    +
    +(defn- agg-topo-exec-stats*
    +  "A helper function that does the common work to aggregate stats of one
    +  executor with the given map for the topology page."
    +  [window
    +   include-sys?
    +   {:keys [workers-set
    +           bolt-id->stats
    +           spout-id->stats
    +           window->emitted
    +           window->transferred
    +           window->comp-lat-wgt-avg
    +           window->acked
    +           window->failed] :as acc-stats}
    +   {:keys [stats] :as new-data}
    +   pre-merge-fn
    +   merge-fn
    +   comp-key]
    +  (let [cid->statk->num (pre-merge-fn new-data window include-sys?)
    +        {w->compLatWgtAvg :completeLatencyTotal
    +         w->acked :acked}
    +          (if (:complete-latencies stats)
    +            (swap-map-order
    +              (into {}
    +                    (for [w (keys (:acked stats))]
    +                         [w (agg-spout-lat-and-count
    +                              (get (:complete-latencies stats) w)
    +                              (get (:acked stats) w))])))
    +            {:completeLatencyTotal nil
    +             :acks (aggregate-count-streams (:acked stats))})
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    (assoc {:workers-set (conj workers-set
    +                               [(:host new-data) (:port new-data)])
    +            :bolt-id->stats bolt-id->stats
    +            :spout-id->stats spout-id->stats
    +            :window->emitted (->> (:emitted stats)
    +                                  (map-val handle-sys-components-fn)
    +                                  aggregate-count-streams
    +                                  (merge-with + window->emitted))
    +            :window->transferred (->> (:transferred stats)
    +                                      (map-val handle-sys-components-fn)
    +                                      aggregate-count-streams
    +                                      (merge-with + window->transferred))
    +            :window->comp-lat-wgt-avg (merge-with +
    +                                                  window->comp-lat-wgt-avg
    +                                                  w->compLatWgtAvg)
    +            :window->acked (if (= :spout (:type stats))
    +                             (merge-with + window->acked w->acked)
    +                             window->acked)
    +            :window->failed (if (= :spout (:type stats))
    +                              (->> (:failed stats)
    +                                   aggregate-count-streams
    +                                   (merge-with + window->failed))
    +                              window->failed)}
    +           comp-key (merge-with merge-fn
    +                                (acc-stats comp-key)
    +                                cid->statk->num)
    +           :type (:type stats))))
    +
    +(defmulti agg-topo-exec-stats
    +  "Combines the aggregate stats of one executor with the given map, 
selecting
    +  the appropriate window and including system components as specified."
    +  (fn dispatch-fn [& args] (:type (last args))))
    +
    +(defmethod agg-topo-exec-stats :bolt
    +  [window include-sys? acc-stats new-data]
    +  (agg-topo-exec-stats* window
    +                        include-sys?
    +                        acc-stats
    +                        new-data
    +                        agg-pre-merge-topo-page-bolt
    +                        merge-agg-comp-stats-topo-page-bolt
    +                        :bolt-id->stats))
    +
    +(defmethod agg-topo-exec-stats :spout
    +  [window include-sys? acc-stats new-data]
    +  (agg-topo-exec-stats* window
    +                        include-sys?
    +                        acc-stats
    +                        new-data
    +                        agg-pre-merge-topo-page-spout
    +                        merge-agg-comp-stats-topo-page-spout
    +                        :spout-id->stats))
    +
    +(defmethod agg-topo-exec-stats :default [_ _ acc-stats _] acc-stats)
    +
    +(defn get-last-error
    +  [storm-cluster-state storm-id component-id]
    +  (if-let [e (.last-error storm-cluster-state storm-id component-id)]
    +    (ErrorInfo. (:error e) (:time-secs e))))
    +
    +(defn component-type
    +  "Returns the component type (either :bolt or :spout) for a given
    +  topology and component id. Returns nil if not found."
    +  [^StormTopology topology id]
    +  (let [bolts (.get_bolts topology)
    +        spouts (.get_spouts topology)]
    +    (cond
    +      (.containsKey bolts id) :bolt
    +      (.containsKey spouts id) :spout)))
    +
    +(defn extract-data-from-hb
    +  ([exec->host+port task->component beats include-sys? topology comp-id]
    +   (for [[[start end :as executor] [host port]] exec->host+port
    +         :let [beat (beats executor)
    +               id (task->component start)]
    +         :when (and (or (nil? comp-id) (= comp-id id))
    +                    (or include-sys? (not (system-id? id))))]
    +     {:exec-id executor
    +      :comp-id id
    +      :num-tasks (count (range start (inc end)))
    +      :host host
    +      :port port
    +      :uptime (:uptime beat)
    +      :stats (:stats beat)
    +      :type (or (:type (:stats beat))
    +                (component-type topology id))}))
    +  ([exec->host+port task->component beats include-sys? topology]
    +    (extract-data-from-hb exec->host+port
    +                          task->component
    +                          beats
    +                          include-sys?
    +                          topology
    +                          nil)))
    +
    +(defn aggregate-topo-stats
    +  [window include-sys? data]
    +  (let [init-val {:workers-set #{}
    +                  :bolt-id->stats {}
    +                  :spout-id->stats {}
    +                  :window->emitted {}
    +                  :window->transferred {}
    +                  :window->comp-lat-wgt-avg {}
    +                  :window->acked {}
    +                  :window->failed {}}
    +        reducer-fn (partial agg-topo-exec-stats
    +                            window
    +                            include-sys?)]
    +    (reduce reducer-fn init-val data)))
    +
    +(defn- compute-weighted-averages-per-window
    +  [acc-data wgt-avg-key divisor-key]
    +  (into {} (for [[window wgt-avg] (wgt-avg-key acc-data)
    +                 :let [divisor ((divisor-key acc-data) window)]
    +                 :when (and divisor (pos? divisor))]
    +             [(str window) (div wgt-avg divisor)])))
    +
    +(defn- post-aggregate-topo-stats
    +  [task->component exec->node+port last-err-fn acc-data]
    +  {:num-tasks (count task->component)
    +   :num-workers (count (:workers-set acc-data))
    +   :num-executors (count exec->node+port)
    +   :bolt-id->stats
    +     (into {} (for [[id m] (:bolt-id->stats acc-data)
    +                    :let [executed (:executed m)]]
    +                     [id (-> m
    +                             (assoc :execute-latency
    +                                    (if (and executed (pos? executed))
    +                                      (div (or (:executeLatencyTotal m) 0)
    +                                           executed)
    +                                      0)
    +                                    :process-latency
    +                                    (if (and executed (pos? executed))
    +                                      (div (or (:processLatencyTotal m) 0)
    +                                           executed)
    +                                      0))
    +                             (dissoc :executeLatencyTotal
    +                                     :processLatencyTotal)
    +                             (assoc :lastError (last-err-fn id)))]))
    +   :spout-id->stats
    +     (into {} (for [[id m] (:spout-id->stats acc-data)
    +                    :let [acked (:acked m)]]
    +                    [id (-> m
    +                            (assoc :completeLatency
    +                                   (if (and acked (pos? acked))
    +                                     (div (:completeLatencyTotal m)
    +                                          (:acked m))
    +                                     0))
    +                            (dissoc :completeLatencyTotal)
    +                            (assoc :lastError (last-err-fn id)))]))
    +   :window->emitted (map-key str (:window->emitted acc-data))
    +   :window->transferred (map-key str (:window->transferred acc-data))
    +   :window->complete-latency
    +     (compute-weighted-averages-per-window acc-data
    +                                           :window->comp-lat-wgt-avg
    +                                           :window->acked)
    +   :window->acked (map-key str (:window->acked acc-data))
    +   :window->failed (map-key str (:window->failed acc-data))})
    +
    +(defn- thriftify-common-agg-stats
    +  [^ComponentAggregateStats s
    +   {:keys [num-tasks
    +           emitted
    +           transferred
    +           acked
    +           failed
    +           num-executors] :as statk->num}]
    +  (let [cas (CommonAggregateStats.)]
    +    (and num-executors (.set_num_executors cas num-executors))
    +    (and num-tasks (.set_num_tasks cas num-tasks))
    +    (and emitted (.set_emitted cas emitted))
    +    (and transferred (.set_transferred cas transferred))
    +    (and acked (.set_acked cas acked))
    +    (and failed (.set_failed cas failed))
    +    (.set_common_stats s cas)))
    +
    +(defn thriftify-bolt-agg-stats
    +  [statk->num]
    +  (let [{:keys [lastError
    +                execute-latency
    +                process-latency
    +                executed
    +                capacity]} statk->num
    +        s (ComponentAggregateStats.)]
    +    (.set_type s ComponentType/BOLT)
    +    (and lastError (.set_last_error s lastError))
    +    (thriftify-common-agg-stats s statk->num)
    +    (.set_specific_stats s
    +      (SpecificAggregateStats/bolt
    +        (let [bas (BoltAggregateStats.)]
    +          (and execute-latency (.set_execute_latency_ms bas 
execute-latency))
    +          (and process-latency (.set_process_latency_ms bas 
process-latency))
    +          (and executed (.set_executed bas executed))
    +          (and capacity (.set_capacity bas capacity))
    +          bas)))
    +    s))
    +
    +(defn thriftify-spout-agg-stats
    +  [statk->num]
    +  (let [{:keys [lastError
    +                complete-latency]} statk->num
    +        s (ComponentAggregateStats.)]
    +    (.set_type s ComponentType/SPOUT)
    +    (and lastError (.set_last_error s lastError))
    +    (thriftify-common-agg-stats s statk->num)
    +    (.set_specific_stats s
    +      (SpecificAggregateStats/spout
    +        (let [sas (SpoutAggregateStats.)]
    +          (and complete-latency (.set_complete_latency_ms sas 
complete-latency))
    +          sas)))
    +    s))
    +
    +(defn thriftify-topo-page-data
    +  [topology-id data]
    +  (let [{:keys [num-tasks
    +                num-workers
    +                num-executors
    +                spout-id->stats
    +                bolt-id->stats
    +                window->emitted
    +                window->transferred
    +                window->complete-latency
    +                window->acked
    +                window->failed]} data
    +        spout-agg-stats (into {}
    +                              (for [[id m] spout-id->stats
    +                                    :let [m (assoc m :type :spout)]]
    +                                [id
    +                                 (thriftify-spout-agg-stats m)]))
    +        bolt-agg-stats (into {}
    +                             (for [[id m] bolt-id->stats
    +                                   :let [m (assoc m :type :bolt)]]
    +                              [id
    +                               (thriftify-bolt-agg-stats m)]))
    +        topology-stats (doto (TopologyStats.)
    +                         (.set_window_to_emitted window->emitted)
    +                         (.set_window_to_transferred window->transferred)
    +                         (.set_window_to_complete_latencies_ms
    +                           window->complete-latency)
    +                         (.set_window_to_acked window->acked)
    +                         (.set_window_to_failed window->failed))
    +      topo-page-info (doto (TopologyPageInfo. topology-id)
    +                       (.set_num_tasks num-tasks)
    +                       (.set_num_workers num-workers)
    +                       (.set_num_executors num-executors)
    +                       (.set_id_to_spout_agg_stats spout-agg-stats)
    +                       (.set_id_to_bolt_agg_stats bolt-agg-stats)
    +                       (.set_topology_stats topology-stats))]
    +    topo-page-info))
    +
    +(defn agg-topo-execs-stats
    +  "Aggregate various executor statistics for a topology from the given
    +  heartbeats."
    +  [topology-id
    +   exec->node+port
    +   task->component
    +   beats
    +   topology
    +   window
    +   include-sys?
    +   last-err-fn]
    +  (->> ;; This iterates over each executor one time, because of lazy 
evaluation.
    +    (extract-data-from-hb exec->node+port
    +                          task->component
    +                          beats
    +                          include-sys?
    +                          topology)
    +    (aggregate-topo-stats window include-sys?)
    +    (post-aggregate-topo-stats task->component exec->node+port last-err-fn)
    +    (thriftify-topo-page-data topology-id)))
    +
    +(defn- agg-bolt-exec-win-stats
    +  "A helper function that aggregates windowed stats from one bolt 
executor."
    +  [acc-stats new-stats include-sys?]
    +  (let [{w->execLatWgtAvg :executeLatencyTotal
    +         w->procLatWgtAvg :processLatencyTotal
    +         w->executed :executed}
    +          (swap-map-order
    +            (into {} (for [w (keys (:executed new-stats))]
    +                       [w (agg-bolt-lat-and-count
    +                            (get (:execute-latencies new-stats) w)
    +                            (get (:process-latencies new-stats) w)
    +                            (get (:executed new-stats) w))])))
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    {:window->emitted (->> (:emitted new-stats)
    +                           (map-val handle-sys-components-fn)
    +                           aggregate-count-streams
    +                           (merge-with + (:window->emitted acc-stats)))
    +     :window->transferred (->> (:transferred new-stats)
    +                               (map-val handle-sys-components-fn)
    +                               aggregate-count-streams
    +                               (merge-with + (:window->transferred 
acc-stats)))
    +     :window->exec-lat-wgt-avg (merge-with +
    +                                           (:window->exec-lat-wgt-avg 
acc-stats)
    +                                           w->execLatWgtAvg)
    +     :window->proc-lat-wgt-avg (merge-with +
    +                                           (:window->proc-lat-wgt-avg 
acc-stats)
    +                                           w->procLatWgtAvg)
    +     :window->executed (merge-with + (:window->executed acc-stats) 
w->executed)
    +     :window->acked (->> (:acked new-stats)
    +                         aggregate-count-streams
    +                         (merge-with + (:window->acked acc-stats)))
    +     :window->failed (->> (:failed new-stats)
    +                          aggregate-count-streams
    +                          (merge-with + (:window->failed acc-stats)))}))
    +
    +(defn- agg-spout-exec-win-stats
    +  "A helper function that aggregates windowed stats from one spout 
executor."
    +  [acc-stats new-stats include-sys?]
    +  (let [{w->compLatWgtAvg :completeLatencyTotal
    +         w->acked :acked}
    +          (swap-map-order
    +            (into {} (for [w (keys (:acked new-stats))]
    +                       [w (agg-spout-lat-and-count
    +                            (get (:complete-latencies new-stats) w)
    +                            (get (:acked new-stats) w))])))
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    {:window->emitted (->> (:emitted new-stats)
    +                           (map-val handle-sys-components-fn)
    +                           aggregate-count-streams
    +                           (merge-with + (:window->emitted acc-stats)))
    +     :window->transferred (->> (:transferred new-stats)
    +                               (map-val handle-sys-components-fn)
    +                               aggregate-count-streams
    +                               (merge-with + (:window->transferred 
acc-stats)))
    +     :window->comp-lat-wgt-avg (merge-with +
    +                                           (:window->comp-lat-wgt-avg 
acc-stats)
    +                                           w->compLatWgtAvg)
    +     :window->acked (->> (:acked new-stats)
    +                         aggregate-count-streams
    +                         (merge-with + (:window->acked acc-stats)))
    +     :window->failed (->> (:failed new-stats)
    +                          aggregate-count-streams
    +                          (merge-with + (:window->failed acc-stats)))}))
    +
    +(defmulti agg-comp-exec-stats
    +  "Combines the aggregate stats of one executor with the given map, 
selecting
    +  the appropriate window and including system components as specified."
    +  (fn dispatch-fn [_ _ init-val _] (:type init-val)))
    +
    +(defmethod agg-comp-exec-stats :bolt
    +  [window include-sys? acc-stats new-data]
    +  (assoc (agg-bolt-exec-win-stats acc-stats (:stats new-data) include-sys?)
    +         :stats (merge-agg-comp-stats-comp-page-bolt
    +                  (:stats acc-stats)
    +                  (agg-pre-merge-comp-page-bolt new-data window 
include-sys?))
    +         :type :bolt))
    +
    +(defmethod agg-comp-exec-stats :spout
    +  [window include-sys? acc-stats new-data]
    +  (assoc (agg-spout-exec-win-stats acc-stats (:stats new-data) 
include-sys?)
    +         :stats (merge-agg-comp-stats-comp-page-spout
    +                  (:stats acc-stats)
    +                  (agg-pre-merge-comp-page-spout new-data window 
include-sys?))
    +         :type :spout))
    +
    +(defn- aggregate-comp-stats*
    +  [window include-sys? data init-val]
    +  (-> (partial agg-comp-exec-stats
    +               window
    +               include-sys?)
    +      (reduce init-val data)))
    +
    +(defmulti aggregate-comp-stats
    +  (fn dispatch-fn [& args] (-> args last first :stats :type)))
    --- End diff --
    
    This could result in a null dispatch value if the component's heartbeat 
does not contain metrics.  We should be using the `:type` directly instead of 
accessing it via `:stats`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to