[ 
https://issues.apache.org/jira/browse/STORM-820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14556732#comment-14556732
 ] 

ASF GitHub Bot commented on STORM-820:
--------------------------------------

Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30925867
  
    --- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
    @@ -373,4 +360,1248 @@
         (ExecutorStats. (window-set-converter (:emitted stats) str)
           (window-set-converter (:transferred stats) str)
           specific-stats
    -      rate)))
    \ No newline at end of file
    +      rate)))
    +
    +(defn- agg-bolt-lat-and-count
    +  "Aggregates number executed and process & execute latencies across all
    +  streams."
    +  [idk->exec-avg idk->proc-avg idk->num-executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->num-executed]))}
    +  (letfn [(weight-avg [[id avg]] (let [num-e (get idk->num-executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
    +     :processLatencyTotal (sum (map weight-avg idk->proc-avg))
    +     :executed (sum (vals idk->num-executed))}))
    +
    +(defn- agg-spout-lat-and-count
    +  "Aggregates number acked and complete latencies across all streams."
    +  [sid->comp-avg sid->num-acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [sid->comp-avg
    +                       sid->num-acked]))}
    +  (letfn [(weight-avg [[id avg]] (* avg (get sid->num-acked id)))]
    +    {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
    +     :acked (sum (vals sid->num-acked))}))
    +
    +(defn add-pairs
    +  ([] [0 0])
    +  ([[a1 a2] [b1 b2]]
    +   [(+ a1 b1) (+ a2 b2)]))
    +
    +(defn mk-include-sys-fn
    +  [include-sys?]
    +  (if include-sys?
    +    (fn [_] true)
    +    (fn [stream] (and (string? stream) (not (system-id? stream))))))
    +
    +(defn mk-include-sys-filter
    +  "Returns a function that includes or excludes map entries whose keys are
    +  system ids."
    +  [include-sys?]
    +  (if include-sys?
    +    identity
    +    (partial filter-key (mk-include-sys-fn false))))
    +
    +(defn- agg-bolt-streams-lat-and-count
    +  "Aggregates number executed and process & execute latencies."
    +  [idk->exec-avg idk->proc-avg idk->executed]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->exec-avg
    +                       idk->proc-avg
    +                       idk->executed]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (idk->executed id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->exec-avg)]
    +        [k {:executeLatencyTotal (weight-avg k (idk->exec-avg k))
    +            :processLatencyTotal (weight-avg k (idk->proc-avg k))
    +            :executed (idk->executed k)}]))))
    +
    +(defn- agg-spout-streams-lat-and-count
    +  "Aggregates number acked and complete latencies."
    +  [idk->comp-avg idk->acked]
    +  {:pre (apply = (map #(set (keys %))
    +                      [idk->comp-avg
    +                       idk->acked]))}
    +  (letfn [(weight-avg [id avg] (let [num-e (get idk->acked id)]
    +                                   (if (and avg num-e)
    +                                     (* avg num-e)
    +                                     0)))]
    +    (into {}
    +      (for [k (keys idk->comp-avg)]
    +        [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k))
    +            :acked (get idk->acked k)}]))))
    +
    +(defn swap-map-order
    +  "{:a {:A 3, :B 5}, :b {:A 1, :B 2}}
    +    -> {:A {:b 1, :a 3}, :B {:b 2, :a 5}}"
    +  [m]
    +  (apply merge-with
    +         merge
    +         (map (fn [[k v]]
    +                (into {}
    +                      (for [[k2 v2] v]
    +                        [k2 {k v2}])))
    +              m)))
    +
    +(defn- compute-agg-capacity
    +  "Computes the capacity metric for one executor given its heartbeat data 
and
    +  uptime."
    +  [m uptime]
    +  (when uptime
    +    (->>
    +      ;; For each stream, create weighted averages and counts.
    +      (merge-with (fn weighted-avg+count-fn
    +                    [avg cnt]
    +                    [(* avg cnt) cnt])
    +                  (get (:execute-latencies m) "600")
    +                  (get (:executed m) "600"))
    +      vals ;; Ignore the stream ids.
    +      (reduce add-pairs
    +              [0. 0]) ;; Combine weighted averages and counts.
    +      ((fn [[weighted-avg cnt]]
    +        (div weighted-avg (* 1000 (min uptime 600))))))))
    --- End diff --
    
    Same here.


> UI Topology & Component Pages have long load times with large, 
> highly-connected Topologies
> ------------------------------------------------------------------------------------------
>
>                 Key: STORM-820
>                 URL: https://issues.apache.org/jira/browse/STORM-820
>             Project: Apache Storm
>          Issue Type: Improvement
>    Affects Versions: 0.11.0
>            Reporter: Derek Dagit
>            Assignee: Derek Dagit
>
> In the UI, the Topology Page and the Component Page each make a 
> getTopologyInfoWithOpts thrift call to nimbus for executor heartbeat data. 
> Metrics from this data are then aggregated in by the UI daemon for display.
> When large topologies, with high-connectedness, are viewed in this way, the 
> load times for each page can be minutes long.  In addition, heap usage by the 
> nimbus JVM can grow substantially as data for each executor, component, & 
> stream is serialized to be sent to the UI.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to