[
https://issues.apache.org/jira/browse/STORM-820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14556545#comment-14556545
]
ASF GitHub Bot commented on STORM-820:
--------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/554#discussion_r30918410
--- Diff: storm-core/src/clj/backtype/storm/stats.clj ---
@@ -373,4 +360,1248 @@
(ExecutorStats. (window-set-converter (:emitted stats) str)
(window-set-converter (:transferred stats) str)
specific-stats
- rate)))
\ No newline at end of file
+ rate)))
+
+(defn- agg-bolt-lat-and-count
+ "Aggregates number executed and process & execute latencies across all
+ streams."
+ [idk->exec-avg idk->proc-avg idk->num-executed]
+ {:pre (apply = (map #(set (keys %))
+ [idk->exec-avg
+ idk->proc-avg
+ idk->num-executed]))}
+ (letfn [(weight-avg [[id avg]] (let [num-e (get idk->num-executed id)]
+ (if (and avg num-e)
+ (* avg num-e)
+ 0)))]
+ {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
+ :processLatencyTotal (sum (map weight-avg idk->proc-avg))
+ :executed (sum (vals idk->num-executed))}))
+
+(defn- agg-spout-lat-and-count
+ "Aggregates number acked and complete latencies across all streams."
+ [sid->comp-avg sid->num-acked]
+ {:pre (apply = (map #(set (keys %))
+ [sid->comp-avg
+ sid->num-acked]))}
+ (letfn [(weight-avg [[id avg]] (* avg (get sid->num-acked id)))]
+ {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
+ :acked (sum (vals sid->num-acked))}))
+
+(defn add-pairs
+ ([] [0 0])
+ ([[a1 a2] [b1 b2]]
+ [(+ a1 b1) (+ a2 b2)]))
+
+(defn mk-include-sys-fn
+ [include-sys?]
+ (if include-sys?
+ (fn [_] true)
+ (fn [stream] (and (string? stream) (not (system-id? stream))))))
+
+(defn mk-include-sys-filter
+ "Returns a function that includes or excludes map entries whose keys are
+ system ids."
+ [include-sys?]
+ (if include-sys?
+ identity
+ (partial filter-key (mk-include-sys-fn false))))
+
+(defn- agg-bolt-streams-lat-and-count
+ "Aggregates number executed and process & execute latencies."
+ [idk->exec-avg idk->proc-avg idk->executed]
+ {:pre (apply = (map #(set (keys %))
+ [idk->exec-avg
+ idk->proc-avg
+ idk->executed]))}
+ (letfn [(weight-avg [id avg] (let [num-e (idk->executed id)]
+ (if (and avg num-e)
+ (* avg num-e)
+ 0)))]
+ (into {}
+ (for [k (keys idk->exec-avg)]
+ [k {:executeLatencyTotal (weight-avg k (idk->exec-avg k))
+ :processLatencyTotal (weight-avg k (idk->proc-avg k))
+ :executed (idk->executed k)}]))))
+
+(defn- agg-spout-streams-lat-and-count
+ "Aggregates number acked and complete latencies."
+ [idk->comp-avg idk->acked]
+ {:pre (apply = (map #(set (keys %))
+ [idk->comp-avg
+ idk->acked]))}
+ (letfn [(weight-avg [id avg] (let [num-e (get idk->acked id)]
+ (if (and avg num-e)
+ (* avg num-e)
+ 0)))]
+ (into {}
+ (for [k (keys idk->comp-avg)]
+ [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k))
+ :acked (get idk->acked k)}]))))
+
+(defn swap-map-order
+ "{:a {:A 3, :B 5}, :b {:A 1, :B 2}}
--- End diff --
This doc string is a bit confusing. I understand what the method does, but
I had to read the code to really get it.
> UI Topology & Component Pages have long load times with large,
> highly-connected Topologies
> ------------------------------------------------------------------------------------------
>
> Key: STORM-820
> URL: https://issues.apache.org/jira/browse/STORM-820
> Project: Apache Storm
> Issue Type: Improvement
> Affects Versions: 0.11.0
> Reporter: Derek Dagit
> Assignee: Derek Dagit
>
> In the UI, the Topology Page and the Component Page each make a
> getTopologyInfoWithOpts thrift call to nimbus for executor heartbeat data.
> Metrics from this data are then aggregated in by the UI daemon for display.
> When large topologies, with high-connectedness, are viewed in this way, the
> load times for each page can be minutes long. In addition, heap usage by the
> nimbus JVM can grow substantially as data for each executor, component, &
> stream is serialized to be sent to the UI.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)