[
https://issues.apache.org/jira/browse/STORM-820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14556599#comment-14556599
]
ASF GitHub Bot commented on STORM-820:
--------------------------------------
Github user d2r commented on a diff in the pull request:
https://github.com/apache/storm/pull/554#discussion_r30920821
--- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
@@ -373,4 +360,1248 @@
(ExecutorStats. (window-set-converter (:emitted stats) str)
(window-set-converter (:transferred stats) str)
specific-stats
- rate)))
\ No newline at end of file
+ rate)))
+
+(defn- agg-bolt-lat-and-count
+ "Aggregates number executed and process & execute latencies across all
+ streams."
+ [idk->exec-avg idk->proc-avg idk->num-executed]
+ {:pre (apply = (map #(set (keys %))
+ [idk->exec-avg
+ idk->proc-avg
+ idk->num-executed]))}
+ (letfn [(weight-avg [[id avg]] (let [num-e (get idk->num-executed id)]
+ (if (and avg num-e)
+ (* avg num-e)
+ 0)))]
+ {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
+ :processLatencyTotal (sum (map weight-avg idk->proc-avg))
+ :executed (sum (vals idk->num-executed))}))
+
+(defn- agg-spout-lat-and-count
+ "Aggregates number acked and complete latencies across all streams."
+ [sid->comp-avg sid->num-acked]
+ {:pre (apply = (map #(set (keys %))
+ [sid->comp-avg
+ sid->num-acked]))}
+ (letfn [(weight-avg [[id avg]] (* avg (get sid->num-acked id)))]
+ {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
+ :acked (sum (vals sid->num-acked))}))
+
+(defn add-pairs
+ ([] [0 0])
+ ([[a1 a2] [b1 b2]]
+ [(+ a1 b1) (+ a2 b2)]))
+
+(defn mk-include-sys-fn
+ [include-sys?]
+ (if include-sys?
+ (fn [_] true)
+ (fn [stream] (and (string? stream) (not (system-id? stream))))))
+
+(defn mk-include-sys-filter
+ "Returns a function that includes or excludes map entries whose keys are
+ system ids."
+ [include-sys?]
+ (if include-sys?
+ identity
+ (partial filter-key (mk-include-sys-fn false))))
+
+(defn- agg-bolt-streams-lat-and-count
+ "Aggregates number executed and process & execute latencies."
+ [idk->exec-avg idk->proc-avg idk->executed]
+ {:pre (apply = (map #(set (keys %))
+ [idk->exec-avg
+ idk->proc-avg
+ idk->executed]))}
+ (letfn [(weight-avg [id avg] (let [num-e (idk->executed id)]
+ (if (and avg num-e)
+ (* avg num-e)
+ 0)))]
+ (into {}
+ (for [k (keys idk->exec-avg)]
+ [k {:executeLatencyTotal (weight-avg k (idk->exec-avg k))
+ :processLatencyTotal (weight-avg k (idk->proc-avg k))
+ :executed (idk->executed k)}]))))
+
+(defn- agg-spout-streams-lat-and-count
+ "Aggregates number acked and complete latencies."
+ [idk->comp-avg idk->acked]
+ {:pre (apply = (map #(set (keys %))
+ [idk->comp-avg
+ idk->acked]))}
+ (letfn [(weight-avg [id avg] (let [num-e (get idk->acked id)]
+ (if (and avg num-e)
+ (* avg num-e)
+ 0)))]
+ (into {}
+ (for [k (keys idk->comp-avg)]
+ [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k))
+ :acked (get idk->acked k)}]))))
+
+(defn swap-map-order
+ "{:a {:A 3, :B 5}, :b {:A 1, :B 2}}
+ -> {:A {:b 1, :a 3}, :B {:b 2, :a 5}}"
+ [m]
+ (apply merge-with
+ merge
+ (map (fn [[k v]]
+ (into {}
+ (for [[k2 v2] v]
+ [k2 {k v2}])))
+ m)))
+
+(defn- compute-agg-capacity
+ "Computes the capacity metric for one executor given its heartbeat data
and
+ uptime."
+ [m uptime]
+ (when uptime
+ (->>
+ ;; For each stream, create weighted averages and counts.
+ (merge-with (fn weighted-avg+count-fn
+ [avg cnt]
+ [(* avg cnt) cnt])
+ (get (:execute-latencies m) "600")
+ (get (:executed m) "600"))
+ vals ;; Ignore the stream ids.
+ (reduce add-pairs
+ [0. 0]) ;; Combine weighted averages and counts.
+ ((fn [[weighted-avg cnt]]
+ (div weighted-avg (* 1000 (min uptime 600))))))))
+
+(defn- page+comp-dispatch
+ [page-type comp-type & _]
+ [page-type comp-type])
+
+(defmulti agg-pre-merge
--- End diff --
Agreed. I'll make them unique functions.
> UI Topology & Component Pages have long load times with large,
> highly-connected Topologies
> ------------------------------------------------------------------------------------------
>
> Key: STORM-820
> URL: https://issues.apache.org/jira/browse/STORM-820
> Project: Apache Storm
> Issue Type: Improvement
> Affects Versions: 0.11.0
> Reporter: Derek Dagit
> Assignee: Derek Dagit
>
> In the UI, the Topology Page and the Component Page each make a
> getTopologyInfoWithOpts thrift call to nimbus for executor heartbeat data.
> Metrics from this data are then aggregated in by the UI daemon for display.
> When large topologies, with high-connectedness, are viewed in this way, the
> load times for each page can be minutes long. In addition, heap usage by the
> nimbus JVM can grow substantially as data for each executor, component, &
> stream is serialized to be sent to the UI.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)