[ 
https://issues.apache.org/jira/browse/STORM-820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14563481#comment-14563481
 ] 

ASF GitHub Bot commented on STORM-820:
--------------------------------------

Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r31266476
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -373,4 +362,1222 @@
         (ExecutorStats. (window-set-converter (:emitted stats) str)
           (window-set-converter (:transferred stats) str)
           specific-stats
    -      rate)))
    \ No newline at end of file
    +      rate)))
    +
    +(defn- agg-bolt-lat-and-count
    +  "Aggregates number executed, process latency, and execute latency across 
all
    +  streams."
    +  [idk->exec-avg idk->proc-avg idk->num-executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->num-executed]))}
    +  (letfn [(weight-avg [[id avg]] (let [num-e (get idk->num-executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
    +     :processLatencyTotal (sum (map weight-avg idk->proc-avg))
    +     :executed (sum (vals idk->num-executed))}))
    +
    +(defn- agg-spout-lat-and-count
    +  "Aggregates number acked and complete latencies across all streams."
    +  [sid->comp-avg sid->num-acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [sid->comp-avg
    +                       sid->num-acked]))}
    +  (letfn [(weight-avg [[id avg]] (* avg (get sid->num-acked id)))]
    +    {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
    +     :acked (sum (vals sid->num-acked))}))
    +
    +(defn add-pairs
    +  ([] [0 0])
    +  ([[a1 a2] [b1 b2]]
    +   [(+ a1 b1) (+ a2 b2)]))
    +
    +(defn mk-include-sys-fn
    +  [include-sys?]
    +  (if include-sys?
    +    (fn [_] true)
    +    (fn [stream] (and (string? stream) (not (system-id? stream))))))
    +
    +(defn mk-include-sys-filter
    +  "Returns a function that includes or excludes map entries whose keys are
    +  system ids."
    +  [include-sys?]
    +  (if include-sys?
    +    identity
    +    (partial filter-key (mk-include-sys-fn false))))
    +
    +(defn- agg-bolt-streams-lat-and-count
    +  "Aggregates number executed and process & execute latencies."
    +  [idk->exec-avg idk->proc-avg idk->executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->executed]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (idk->executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->exec-avg)]
    +        [k {:executeLatencyTotal (weight-avg k (idk->exec-avg k))
    +            :processLatencyTotal (weight-avg k (idk->proc-avg k))
    +            :executed (idk->executed k)}]))))
    +
    +(defn- agg-spout-streams-lat-and-count
    +  "Aggregates number acked and complete latencies."
    +  [idk->comp-avg idk->acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->comp-avg
    +                       idk->acked]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (get idk->acked id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->comp-avg)]
    +        [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k))
    +            :acked (get idk->acked k)}]))))
    +
    +(defn swap-map-order
    +  "For a nested map, rearrange data such that the top-level keys become the
    +  nested map's keys and vice versa.
    +  Example:
    +  {:a {:X :banana, :Y :pear}, :b {:X :apple, :Y :orange}}
    +  -> {:Y {:a :pear, :b :orange}, :X {:a :banana, :b :apple}}"
    +  [m]
    +  (apply merge-with
    +         merge
    +         (map (fn [[k v]]
    +                (into {}
    +                      (for [[k2 v2] v]
    +                        [k2 {k v2}])))
    +              m)))
    +
    +(defn- compute-agg-capacity
    +  "Computes the capacity metric for one executor given its heartbeat data 
and
    +  uptime."
    +  [m uptime]
    +  (when uptime
    +    (->>
    +      ;; For each stream, create weighted averages and counts.
    +      (merge-with (fn weighted-avg+count-fn
    +                    [avg cnt]
    +                    [(* avg cnt) cnt])
    +                  (get (:execute-latencies m) (str TEN-MIN-IN-SECONDS))
    +                  (get (:executed m) (str TEN-MIN-IN-SECONDS)))
    +      vals ;; Ignore the stream ids.
    +      (reduce add-pairs
    +              [0. 0]) ;; Combine weighted averages and counts.
    +      ((fn [[weighted-avg cnt]]
    +        (div weighted-avg (* 1000 (min uptime TEN-MIN-IN-SECONDS))))))))
    +
    +(defn agg-pre-merge-comp-page-bolt
    +  [{exec-id :exec-id
    +    host :host
    +    port :port
    +    uptime :uptime
    +    comp-id :comp-id
    +    num-tasks :num-tasks
    +    statk->w->sid->num :stats}
    +   window
    +   include-sys?]
    +  (let [str-key (partial map-key str)
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    {:executor-id exec-id,
    +     :host host,
    +     :port port,
    +     :uptime uptime,
    +     :num-executors 1,
    +     :num-tasks num-tasks,
    +     :capacity (compute-agg-capacity statk->w->sid->num uptime)
    +     :cid+sid->input-stats
    +     (merge-with
    +       merge
    +       (swap-map-order
    +         {:acked (-> statk->w->sid->num
    +                     :acked
    +                     str-key
    +                     (get window))
    +          :failed (-> statk->w->sid->num
    +                      :failed
    +                      str-key
    +                      (get window))})
    +       (agg-bolt-streams-lat-and-count (-> statk->w->sid->num
    +                                           :execute-latencies
    +                                           str-key
    +                                           (get window))
    +                                       (-> statk->w->sid->num
    +                                           :process-latencies
    +                                           str-key
    +                                           (get window))
    +                                       (-> statk->w->sid->num
    +                                           :executed
    +                                           str-key
    +                                           (get window)))),
    +     :sid->output-stats
    +     (swap-map-order
    +       {:emitted (-> statk->w->sid->num
    +                     :emitted
    +                     str-key
    +                     (get window)
    +                     handle-sys-components-fn)
    +        :transferred (-> statk->w->sid->num
    +                         :transferred
    +                         str-key
    +                         (get window)
    +                         handle-sys-components-fn)})}))
    +
    +(defn agg-pre-merge-comp-page-spout
    +  [{exec-id :exec-id
    +    host :host
    +    port :port
    +    uptime :uptime
    +    comp-id :comp-id
    +    num-tasks :num-tasks
    +    statk->w->sid->num :stats}
    +   window
    +   include-sys?]
    +  (let [str-key (partial map-key str)
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    {:executor-id exec-id,
    +     :host host,
    +     :port port,
    +     :uptime uptime,
    +     :num-executors 1,
    +     :num-tasks num-tasks,
    +     :sid->output-stats
    +     (merge-with
    +       merge
    +       (agg-spout-streams-lat-and-count (-> statk->w->sid->num
    +                                            :complete-latencies
    +                                            str-key
    +                                            (get window))
    +                                        (-> statk->w->sid->num
    +                                            :acked
    +                                            str-key
    +                                            (get window)))
    +       (swap-map-order
    +         {:acked (-> statk->w->sid->num
    +                     :acked
    +                     str-key
    +                     (get window))
    +          :failed (-> statk->w->sid->num
    +                      :failed
    +                      str-key
    +                      (get window))
    +          :emitted (-> statk->w->sid->num
    +                       :emitted
    +                       str-key
    +                       (get window)
    +                       handle-sys-components-fn)
    +          :transferred (-> statk->w->sid->num
    +                           :transferred
    +                           str-key
    +                           (get window)
    +                           handle-sys-components-fn)}))}))
    +
    +(defn agg-pre-merge-topo-page-bolt
    +  [{comp-id :comp-id
    +    num-tasks :num-tasks
    +    statk->w->sid->num :stats
    +    uptime :uptime}
    +   window
    +   include-sys?]
    +  (let [str-key (partial map-key str)
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    {comp-id
    +     (merge
    +       (agg-bolt-lat-and-count (-> statk->w->sid->num
    +                                   :execute-latencies
    +                                   str-key
    +                                   (get window))
    +                               (-> statk->w->sid->num
    +                                   :process-latencies
    +                                   str-key
    +                                   (get window))
    +                               (-> statk->w->sid->num
    +                                   :executed
    +                                   str-key
    +                                   (get window)))
    +       {:num-executors 1
    +        :num-tasks num-tasks
    +        :emitted (-> statk->w->sid->num
    +                     :emitted
    +                     str-key
    +                     (get window)
    +                     handle-sys-components-fn
    +                     vals
    +                     sum)
    +        :transferred (-> statk->w->sid->num
    +                         :transferred
    +                         str-key
    +                         (get window)
    +                         handle-sys-components-fn
    +                         vals
    +                         sum)
    +        :capacity (compute-agg-capacity statk->w->sid->num uptime)
    +        :acked (-> statk->w->sid->num
    +                   :acked
    +                   str-key
    +                   (get window)
    +                   vals
    +                   sum)
    +        :failed (-> statk->w->sid->num
    +                    :failed
    +                    str-key
    +                    (get window)
    +                    vals
    +                    sum)})}))
    +
    +(defn agg-pre-merge-topo-page-spout
    +  [{comp-id :comp-id
    +    num-tasks :num-tasks
    +    statk->w->sid->num :stats}
    +   window
    +   include-sys?]
    +  (let [str-key (partial map-key str)
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    {comp-id
    +     (merge
    +       (agg-spout-lat-and-count (-> statk->w->sid->num
    +                                    :complete-latencies
    +                                    str-key
    +                                    (get window))
    +                                (-> statk->w->sid->num
    +                                    :acked
    +                                    str-key
    +                                    (get window)))
    +       {:num-executors 1
    +        :num-tasks num-tasks
    +        :emitted (-> statk->w->sid->num
    +                     :emitted
    +                     str-key
    +                     (get window)
    +                     handle-sys-components-fn
    +                     vals
    +                     sum)
    +        :transferred (-> statk->w->sid->num
    +                         :transferred
    +                         str-key
    +                         (get window)
    +                         handle-sys-components-fn
    +                         vals
    +                         sum)
    +        :failed (-> statk->w->sid->num
    +                    :failed
    +                    str-key
    +                    (get window)
    +                    vals
    +                    sum)})}))
    +
    +(defn apply-default
    +  [f defaulting-fn & args]
    +  (apply f (map defaulting-fn args)))
    +
    +(defn apply-or-0
    +  [f & args]
    +  (apply apply-default f #(or % 0) args))
    +
    +(defn sum-or-0
    +  [& args]
    +  (apply apply-or-0 + args))
    +
    +(defn max-or-0
    +  [& args]
    +  (apply apply-or-0 max args))
    +
    +(defn merge-agg-comp-stats-comp-page-bolt
    +  [{acc-in :cid+sid->input-stats
    +    acc-out :sid->output-stats
    +    :as acc-bolt-stats}
    +   {bolt-in :cid+sid->input-stats
    +    bolt-out :sid->output-stats
    +    :as bolt-stats}]
    +  {:num-executors (inc (or (:num-executors acc-bolt-stats) 0)),
    +   :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks 
bolt-stats)),
    +   :sid->output-stats (merge-with (partial merge-with sum-or-0)
    +                                  acc-out
    +                                  bolt-out),
    +   :cid+sid->input-stats (merge-with (partial merge-with sum-or-0)
    +                                     acc-in
    +                                     bolt-in),
    +   :executor-stats
    +   (let [sum-streams (fn [m k] (->> m vals (map k) (apply sum-or-0)))
    +         executed (sum-streams bolt-in :executed)]
    +     (conj (:executor-stats acc-bolt-stats)
    +           (merge
    +             (select-keys bolt-stats
    +                          [:executor-id :uptime :host :port :capacity])
    +             {:emitted (sum-streams bolt-out :emitted)
    +              :transferred (sum-streams bolt-out :transferred)
    +              :acked (sum-streams bolt-in :acked)
    +              :failed (sum-streams bolt-in :failed)
    +              :executed executed}
    +             (->>
    +               (if (and executed (pos? executed))
    +                 [(div (sum-streams bolt-in :executeLatencyTotal) executed)
    +                  (div (sum-streams bolt-in :processLatencyTotal) 
executed)]
    +                 [nil nil])
    +               (mapcat vector [:execute-latency :process-latency])
    +               (apply assoc {})))))})
    +
    +(defn merge-agg-comp-stats-comp-page-spout
    +  [{acc-out :sid->output-stats
    +    :as acc-spout-stats}
    +   {spout-out :sid->output-stats
    +    :as spout-stats}]
    +  {:num-executors (inc (or (:num-executors acc-spout-stats) 0)),
    +   :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks 
spout-stats)),
    +   :sid->output-stats (merge-with (partial merge-with sum-or-0)
    +                                  acc-out
    +                                  spout-out),
    +   :executor-stats
    +   (let [sum-streams (fn [m k] (->> m vals (map k) (apply sum-or-0)))
    +         acked (sum-streams spout-out :acked)]
    +     (conj (:executor-stats acc-spout-stats)
    +           (merge
    +             (select-keys spout-stats [:executor-id :uptime :host :port])
    +             {:emitted (sum-streams spout-out :emitted)
    +              :transferred (sum-streams spout-out :transferred)
    +              :acked acked
    +              :failed (sum-streams spout-out :failed)}
    +             {:complete-latency (if (and acked (pos? acked))
    +                                  (div (sum-streams spout-out
    +                                                    :completeLatencyTotal)
    +                                       acked)
    +                                  nil)})))})
    +
    +(defn merge-agg-comp-stats-topo-page-bolt
    +  [acc-bolt-stats bolt-stats]
    +  {:num-executors (inc (or (:num-executors acc-bolt-stats) 0))
    +   :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks 
bolt-stats))
    +   :emitted (sum-or-0 (:emitted acc-bolt-stats) (:emitted bolt-stats))
    +   :transferred (sum-or-0 (:transferred acc-bolt-stats)
    +                          (:transferred bolt-stats))
    +   :capacity (max-or-0 (:capacity acc-bolt-stats) (:capacity bolt-stats))
    +   ;; We sum average latency totals here to avoid dividing at each step.
    +   ;; Compute the average latencies by dividing the total by the count.
    +   :executeLatencyTotal (sum-or-0 (:executeLatencyTotal acc-bolt-stats)
    +                                  (:executeLatencyTotal bolt-stats))
    +   :processLatencyTotal (sum-or-0 (:processLatencyTotal acc-bolt-stats)
    +                                  (:processLatencyTotal bolt-stats))
    +   :executed (sum-or-0 (:executed acc-bolt-stats) (:executed bolt-stats))
    +   :acked (sum-or-0 (:acked acc-bolt-stats) (:acked bolt-stats))
    +   :failed (sum-or-0 (:failed acc-bolt-stats) (:failed bolt-stats))})
    +
    +(defn merge-agg-comp-stats-topo-page-spout
    +  [acc-spout-stats spout-stats]
    +  {:num-executors (inc (or (:num-executors acc-spout-stats) 0))
    +   :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks 
spout-stats))
    +   :emitted (sum-or-0 (:emitted acc-spout-stats) (:emitted spout-stats))
    +   :transferred (sum-or-0 (:transferred acc-spout-stats) (:transferred 
spout-stats))
    +   ;; We sum average latency totals here to avoid dividing at each step.
    +   ;; Compute the average latencies by dividing the total by the count.
    +   :completeLatencyTotal (sum-or-0 (:completeLatencyTotal acc-spout-stats)
    +                            (:completeLatencyTotal spout-stats))
    +   :acked (sum-or-0 (:acked acc-spout-stats) (:acked spout-stats))
    +   :failed (sum-or-0 (:failed acc-spout-stats) (:failed spout-stats))})
    +
    +(defn aggregate-count-streams
    +  [stats]
    +  (->> stats
    +       (map-val #(reduce + (vals %)))))
    +
    +(defn- agg-topo-exec-stats*
    +  "A helper function that does the common work to aggregate stats of one
    +  executor with the given map for the topology page."
    +  [window
    +   include-sys?
    +   {:keys [workers-set
    +           bolt-id->stats
    +           spout-id->stats
    +           window->emitted
    +           window->transferred
    +           window->comp-lat-wgt-avg
    +           window->acked
    +           window->failed] :as acc-stats}
    +   {:keys [stats] :as new-data}
    +   pre-merge-fn
    +   merge-fn
    +   comp-key]
    +  (let [cid->statk->num (pre-merge-fn new-data window include-sys?)
    +        {w->compLatWgtAvg :completeLatencyTotal
    +         w->acked :acked}
    +          (if (:complete-latencies stats)
    +            (swap-map-order
    +              (into {}
    +                    (for [w (keys (:acked stats))]
    +                         [w (agg-spout-lat-and-count
    +                              (get (:complete-latencies stats) w)
    +                              (get (:acked stats) w))])))
    +            {:completeLatencyTotal nil
    +             :acks (aggregate-count-streams (:acked stats))})
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    (assoc {:workers-set (conj workers-set
    +                               [(:host new-data) (:port new-data)])
    +            :bolt-id->stats bolt-id->stats
    +            :spout-id->stats spout-id->stats
    +            :window->emitted (->> (:emitted stats)
    +                                  (map-val handle-sys-components-fn)
    +                                  aggregate-count-streams
    +                                  (merge-with + window->emitted))
    +            :window->transferred (->> (:transferred stats)
    +                                      (map-val handle-sys-components-fn)
    +                                      aggregate-count-streams
    +                                      (merge-with + window->transferred))
    +            :window->comp-lat-wgt-avg (merge-with +
    +                                                  window->comp-lat-wgt-avg
    +                                                  w->compLatWgtAvg)
    +            :window->acked (if (= :spout (:type stats))
    +                             (merge-with + window->acked w->acked)
    +                             window->acked)
    +            :window->failed (if (= :spout (:type stats))
    +                              (->> (:failed stats)
    +                                   aggregate-count-streams
    +                                   (merge-with + window->failed))
    +                              window->failed)}
    +           comp-key (merge-with merge-fn
    +                                (acc-stats comp-key)
    +                                cid->statk->num)
    +           :type (:type stats))))
    +
    +(defmulti agg-topo-exec-stats
    +  "Combines the aggregate stats of one executor with the given map, 
selecting
    +  the appropriate window and including system components as specified."
    +  (fn dispatch-fn [& args] (:type (last args))))
    +
    +(defmethod agg-topo-exec-stats :bolt
    +  [window include-sys? acc-stats new-data]
    +  (agg-topo-exec-stats* window
    +                        include-sys?
    +                        acc-stats
    +                        new-data
    +                        agg-pre-merge-topo-page-bolt
    +                        merge-agg-comp-stats-topo-page-bolt
    +                        :bolt-id->stats))
    +
    +(defmethod agg-topo-exec-stats :spout
    +  [window include-sys? acc-stats new-data]
    +  (agg-topo-exec-stats* window
    +                        include-sys?
    +                        acc-stats
    +                        new-data
    +                        agg-pre-merge-topo-page-spout
    +                        merge-agg-comp-stats-topo-page-spout
    +                        :spout-id->stats))
    +
    +(defmethod agg-topo-exec-stats :default [_ _ acc-stats _] acc-stats)
    +
    +(defn get-last-error
    +  [storm-cluster-state storm-id component-id]
    +  (if-let [e (.last-error storm-cluster-state storm-id component-id)]
    +    (ErrorInfo. (:error e) (:time-secs e))))
    +
    +(defn component-type
    +  "Returns the component type (either :bolt or :spout) for a given
    +  topology and component id. Returns nil if not found."
    +  [^StormTopology topology id]
    +  (let [bolts (.get_bolts topology)
    +        spouts (.get_spouts topology)]
    +    (cond
    +      (.containsKey bolts id) :bolt
    +      (.containsKey spouts id) :spout)))
    +
    +(defn extract-data-from-hb
    +  ([exec->host+port task->component beats include-sys? topology comp-id]
    +   (for [[[start end :as executor] [host port]] exec->host+port
    +         :let [beat (beats executor)
    +               id (task->component start)]
    +         :when (and (or (nil? comp-id) (= comp-id id))
    +                    (or include-sys? (not (system-id? id))))]
    +     {:exec-id executor
    +      :comp-id id
    +      :num-tasks (count (range start (inc end)))
    +      :host host
    +      :port port
    +      :uptime (:uptime beat)
    +      :stats (:stats beat)
    +      :type (or (:type (:stats beat))
    +                (component-type topology id))}))
    +  ([exec->host+port task->component beats include-sys? topology]
    +    (extract-data-from-hb exec->host+port
    +                          task->component
    +                          beats
    +                          include-sys?
    +                          topology
    +                          nil)))
    +
    +(defn aggregate-topo-stats
    +  [window include-sys? data]
    +  (let [init-val {:workers-set #{}
    +                  :bolt-id->stats {}
    +                  :spout-id->stats {}
    +                  :window->emitted {}
    +                  :window->transferred {}
    +                  :window->comp-lat-wgt-avg {}
    +                  :window->acked {}
    +                  :window->failed {}}
    +        reducer-fn (partial agg-topo-exec-stats
    +                            window
    +                            include-sys?)]
    +    (reduce reducer-fn init-val data)))
    +
    +(defn- compute-weighted-averages-per-window
    +  [acc-data wgt-avg-key divisor-key]
    +  (into {} (for [[window wgt-avg] (wgt-avg-key acc-data)
    +                 :let [divisor ((divisor-key acc-data) window)]
    +                 :when (and divisor (pos? divisor))]
    +             [(str window) (div wgt-avg divisor)])))
    +
    +(defn- post-aggregate-topo-stats
    +  [task->component exec->node+port last-err-fn acc-data]
    +  {:num-tasks (count task->component)
    +   :num-workers (count (:workers-set acc-data))
    +   :num-executors (count exec->node+port)
    +   :bolt-id->stats
    +     (into {} (for [[id m] (:bolt-id->stats acc-data)
    +                    :let [executed (:executed m)]]
    +                     [id (-> m
    +                             (assoc :execute-latency
    +                                    (if (and executed (pos? executed))
    +                                      (div (or (:executeLatencyTotal m) 0)
    +                                           executed)
    +                                      0)
    +                                    :process-latency
    +                                    (if (and executed (pos? executed))
    +                                      (div (or (:processLatencyTotal m) 0)
    +                                           executed)
    +                                      0))
    +                             (dissoc :executeLatencyTotal
    +                                     :processLatencyTotal)
    +                             (assoc :lastError (last-err-fn id)))]))
    +   :spout-id->stats
    +     (into {} (for [[id m] (:spout-id->stats acc-data)
    +                    :let [acked (:acked m)]]
    +                    [id (-> m
    +                            (assoc :completeLatency
    +                                   (if (and acked (pos? acked))
    +                                     (div (:completeLatencyTotal m)
    +                                          (:acked m))
    +                                     0))
    +                            (dissoc :completeLatencyTotal)
    +                            (assoc :lastError (last-err-fn id)))]))
    +   :window->emitted (map-key str (:window->emitted acc-data))
    +   :window->transferred (map-key str (:window->transferred acc-data))
    +   :window->complete-latency
    +     (compute-weighted-averages-per-window acc-data
    +                                           :window->comp-lat-wgt-avg
    +                                           :window->acked)
    +   :window->acked (map-key str (:window->acked acc-data))
    +   :window->failed (map-key str (:window->failed acc-data))})
    +
    +(defn- thriftify-common-agg-stats
    +  [^ComponentAggregateStats s
    +   {:keys [num-tasks
    +           emitted
    +           transferred
    +           acked
    +           failed
    +           num-executors] :as statk->num}]
    +  (let [cas (CommonAggregateStats.)]
    +    (and num-executors (.set_num_executors cas num-executors))
    +    (and num-tasks (.set_num_tasks cas num-tasks))
    +    (and emitted (.set_emitted cas emitted))
    +    (and transferred (.set_transferred cas transferred))
    +    (and acked (.set_acked cas acked))
    +    (and failed (.set_failed cas failed))
    +    (.set_common_stats s cas)))
    +
    +(defn thriftify-bolt-agg-stats
    +  [statk->num]
    +  (let [{:keys [lastError
    +                execute-latency
    +                process-latency
    +                executed
    +                capacity]} statk->num
    +        s (ComponentAggregateStats.)]
    +    (.set_type s ComponentType/BOLT)
    +    (and lastError (.set_last_error s lastError))
    +    (thriftify-common-agg-stats s statk->num)
    +    (.set_specific_stats s
    +      (SpecificAggregateStats/bolt
    +        (let [bas (BoltAggregateStats.)]
    +          (and execute-latency (.set_execute_latency_ms bas 
execute-latency))
    +          (and process-latency (.set_process_latency_ms bas 
process-latency))
    +          (and executed (.set_executed bas executed))
    +          (and capacity (.set_capacity bas capacity))
    +          bas)))
    +    s))
    +
    +(defn thriftify-spout-agg-stats
    +  [statk->num]
    +  (let [{:keys [lastError
    +                complete-latency]} statk->num
    +        s (ComponentAggregateStats.)]
    +    (.set_type s ComponentType/SPOUT)
    +    (and lastError (.set_last_error s lastError))
    +    (thriftify-common-agg-stats s statk->num)
    +    (.set_specific_stats s
    +      (SpecificAggregateStats/spout
    +        (let [sas (SpoutAggregateStats.)]
    +          (and complete-latency (.set_complete_latency_ms sas 
complete-latency))
    +          sas)))
    +    s))
    +
    +(defn thriftify-topo-page-data
    +  [topology-id data]
    +  (let [{:keys [num-tasks
    +                num-workers
    +                num-executors
    +                spout-id->stats
    +                bolt-id->stats
    +                window->emitted
    +                window->transferred
    +                window->complete-latency
    +                window->acked
    +                window->failed]} data
    +        spout-agg-stats (into {}
    +                              (for [[id m] spout-id->stats
    +                                    :let [m (assoc m :type :spout)]]
    +                                [id
    +                                 (thriftify-spout-agg-stats m)]))
    +        bolt-agg-stats (into {}
    +                             (for [[id m] bolt-id->stats
    +                                   :let [m (assoc m :type :bolt)]]
    +                              [id
    +                               (thriftify-bolt-agg-stats m)]))
    +        topology-stats (doto (TopologyStats.)
    +                         (.set_window_to_emitted window->emitted)
    +                         (.set_window_to_transferred window->transferred)
    +                         (.set_window_to_complete_latencies_ms
    +                           window->complete-latency)
    +                         (.set_window_to_acked window->acked)
    +                         (.set_window_to_failed window->failed))
    +      topo-page-info (doto (TopologyPageInfo. topology-id)
    +                       (.set_num_tasks num-tasks)
    +                       (.set_num_workers num-workers)
    +                       (.set_num_executors num-executors)
    +                       (.set_id_to_spout_agg_stats spout-agg-stats)
    +                       (.set_id_to_bolt_agg_stats bolt-agg-stats)
    +                       (.set_topology_stats topology-stats))]
    +    topo-page-info))
    +
    +(defn agg-topo-execs-stats
    +  "Aggregate various executor statistics for a topology from the given
    +  heartbeats."
    +  [topology-id
    +   exec->node+port
    +   task->component
    +   beats
    +   topology
    +   window
    +   include-sys?
    +   last-err-fn]
    +  (->> ;; This iterates over each executor one time, because of lazy 
evaluation.
    +    (extract-data-from-hb exec->node+port
    +                          task->component
    +                          beats
    +                          include-sys?
    +                          topology)
    +    (aggregate-topo-stats window include-sys?)
    +    (post-aggregate-topo-stats task->component exec->node+port last-err-fn)
    +    (thriftify-topo-page-data topology-id)))
    +
    +(defn- agg-bolt-exec-win-stats
    +  "A helper function that aggregates windowed stats from one bolt 
executor."
    +  [acc-stats new-stats include-sys?]
    +  (let [{w->execLatWgtAvg :executeLatencyTotal
    +         w->procLatWgtAvg :processLatencyTotal
    +         w->executed :executed}
    +          (swap-map-order
    +            (into {} (for [w (keys (:executed new-stats))]
    +                       [w (agg-bolt-lat-and-count
    +                            (get (:execute-latencies new-stats) w)
    +                            (get (:process-latencies new-stats) w)
    +                            (get (:executed new-stats) w))])))
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    {:window->emitted (->> (:emitted new-stats)
    +                           (map-val handle-sys-components-fn)
    +                           aggregate-count-streams
    +                           (merge-with + (:window->emitted acc-stats)))
    +     :window->transferred (->> (:transferred new-stats)
    +                               (map-val handle-sys-components-fn)
    +                               aggregate-count-streams
    +                               (merge-with + (:window->transferred 
acc-stats)))
    +     :window->exec-lat-wgt-avg (merge-with +
    +                                           (:window->exec-lat-wgt-avg 
acc-stats)
    +                                           w->execLatWgtAvg)
    +     :window->proc-lat-wgt-avg (merge-with +
    +                                           (:window->proc-lat-wgt-avg 
acc-stats)
    +                                           w->procLatWgtAvg)
    +     :window->executed (merge-with + (:window->executed acc-stats) 
w->executed)
    +     :window->acked (->> (:acked new-stats)
    +                         aggregate-count-streams
    +                         (merge-with + (:window->acked acc-stats)))
    +     :window->failed (->> (:failed new-stats)
    +                          aggregate-count-streams
    +                          (merge-with + (:window->failed acc-stats)))}))
    +
    +(defn- agg-spout-exec-win-stats
    +  "A helper function that aggregates windowed stats from one spout 
executor."
    +  [acc-stats new-stats include-sys?]
    +  (let [{w->compLatWgtAvg :completeLatencyTotal
    +         w->acked :acked}
    +          (swap-map-order
    +            (into {} (for [w (keys (:acked new-stats))]
    +                       [w (agg-spout-lat-and-count
    +                            (get (:complete-latencies new-stats) w)
    +                            (get (:acked new-stats) w))])))
    +        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    +    {:window->emitted (->> (:emitted new-stats)
    +                           (map-val handle-sys-components-fn)
    +                           aggregate-count-streams
    +                           (merge-with + (:window->emitted acc-stats)))
    +     :window->transferred (->> (:transferred new-stats)
    +                               (map-val handle-sys-components-fn)
    +                               aggregate-count-streams
    +                               (merge-with + (:window->transferred 
acc-stats)))
    +     :window->comp-lat-wgt-avg (merge-with +
    +                                           (:window->comp-lat-wgt-avg 
acc-stats)
    +                                           w->compLatWgtAvg)
    +     :window->acked (->> (:acked new-stats)
    +                         aggregate-count-streams
    +                         (merge-with + (:window->acked acc-stats)))
    +     :window->failed (->> (:failed new-stats)
    +                          aggregate-count-streams
    +                          (merge-with + (:window->failed acc-stats)))}))
    +
    +(defmulti agg-comp-exec-stats
    +  "Combines the aggregate stats of one executor with the given map, 
selecting
    +  the appropriate window and including system components as specified."
    +  (fn dispatch-fn [_ _ init-val _] (:type init-val)))
    +
    +(defmethod agg-comp-exec-stats :bolt
    +  [window include-sys? acc-stats new-data]
    +  (assoc (agg-bolt-exec-win-stats acc-stats (:stats new-data) include-sys?)
    +         :stats (merge-agg-comp-stats-comp-page-bolt
    +                  (:stats acc-stats)
    +                  (agg-pre-merge-comp-page-bolt new-data window 
include-sys?))
    +         :type :bolt))
    +
    +(defmethod agg-comp-exec-stats :spout
    +  [window include-sys? acc-stats new-data]
    +  (assoc (agg-spout-exec-win-stats acc-stats (:stats new-data) 
include-sys?)
    +         :stats (merge-agg-comp-stats-comp-page-spout
    +                  (:stats acc-stats)
    +                  (agg-pre-merge-comp-page-spout new-data window 
include-sys?))
    +         :type :spout))
    +
    +(defn- aggregate-comp-stats*
    +  [window include-sys? data init-val]
    +  (-> (partial agg-comp-exec-stats
    +               window
    +               include-sys?)
    +      (reduce init-val data)))
    +
    +(defmulti aggregate-comp-stats
    +  (fn dispatch-fn [& args] (-> args last first :stats :type)))
    --- End diff --
    
    This could result in a null dispatch value if the component's heartbeat 
does not contain metrics.  We should be using the `:type` directly instead of 
accessing it via `:stats`. 


> UI Topology & Component Pages have long load times with large, 
> highly-connected Topologies
> ------------------------------------------------------------------------------------------
>
>                 Key: STORM-820
>                 URL: https://issues.apache.org/jira/browse/STORM-820
>             Project: Apache Storm
>          Issue Type: Improvement
>    Affects Versions: 0.11.0
>            Reporter: Derek Dagit
>            Assignee: Derek Dagit
>
> In the UI, the Topology Page and the Component Page each make a 
> getTopologyInfoWithOpts thrift call to nimbus for executor heartbeat data. 
> Metrics from this data are then aggregated in by the UI daemon for display.
> When large topologies, with high-connectedness, are viewed in this way, the 
> load times for each page can be minutes long.  In addition, heap usage by the 
> nimbus JVM can grow substantially as data for each executor, component, & 
> stream is serialized to be sent to the UI.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to