[
https://issues.apache.org/jira/browse/STORM-820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14556570#comment-14556570
]
ASF GitHub Bot commented on STORM-820:
--------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/554#discussion_r30919755
--- Diff: storm-core/src/clj/backtype/storm/ui/core.clj ---
@@ -783,128 +504,180 @@
"errorLapsedSecs" (get-error-time e)
"error" (.get_error e)})}))
-(defn spout-stats
- [window ^TopologyInfo topology-info component executors include-sys?]
- (let [window-hint (str " (" (window-hint window) ")")
- stats (get-filled-stats executors)
- stream-summary (-> stats (aggregate-spout-stats include-sys?))
- summary (-> stream-summary aggregate-spout-streams)]
- {"spoutSummary" (spout-summary-json
- (.get_id topology-info) component summary window)
- "outputStats" (spout-output-stats stream-summary window)
- "executorStats" (spout-executor-stats (.get_id topology-info)
- executors window
include-sys?)}))
-
-(defn bolt-summary
- [topology-id id stats window]
- (let [times (stats-times (:emitted stats))
- display-map (into {} (for [t times] [t pretty-uptime-sec]))
- display-map (assoc display-map ":all-time" (fn [_] "All time"))]
- (for [k (concat times [":all-time"])
- :let [disp ((display-map k) k)]]
- {"window" k
- "windowPretty" disp
- "emitted" (get-in stats [:emitted k])
- "transferred" (get-in stats [:transferred k])
- "executeLatency" (float-str (get-in stats [:execute-latencies k]))
- "executed" (get-in stats [:executed k])
- "processLatency" (float-str (get-in stats [:process-latencies k]))
- "acked" (get-in stats [:acked k])
- "failed" (get-in stats [:failed k])})))
-
-(defn bolt-output-stats
- [stream-summary window]
- (let [stream-summary (-> stream-summary
- swap-map-order
- (get window)
- (select-keys [:emitted :transferred])
- swap-map-order)]
- (for [[s stats] stream-summary]
- {"stream" s
- "emitted" (nil-to-zero (:emitted stats))
- "transferred" (nil-to-zero (:transferred stats))})))
-
-(defn bolt-input-stats
- [stream-summary window]
- (let [stream-summary
- (-> stream-summary
- swap-map-order
- (get window)
- (select-keys [:acked :failed :process-latencies
- :executed :execute-latencies])
- swap-map-order)]
- (for [[^GlobalStreamId s stats] stream-summary]
- {"component" (.get_componentId s)
- "encodedComponent" (url-encode (.get_componentId s))
- "stream" (.get_streamId s)
- "executeLatency" (float-str (:execute-latencies stats))
- "processLatency" (float-str (:process-latencies stats))
- "executed" (nil-to-zero (:executed stats))
- "acked" (nil-to-zero (:acked stats))
- "failed" (nil-to-zero (:failed stats))})))
-
-(defn bolt-executor-stats
- [topology-id executors window include-sys?]
- (for [^ExecutorSummary e executors
- :let [stats (.get_stats e)
- stats (if stats
- (-> stats
- (aggregate-bolt-stats include-sys?)
- (aggregate-bolt-streams)
- swap-map-order
- (get window)))]]
- {"id" (pretty-executor-info (.get_executor_info e))
- "encodedId" (url-encode (pretty-executor-info (.get_executor_info e)))
- "uptime" (pretty-uptime-sec (.get_uptime_secs e))
- "host" (.get_host e)
- "port" (.get_port e)
- "emitted" (nil-to-zero (:emitted stats))
- "transferred" (nil-to-zero (:transferred stats))
- "capacity" (float-str (nil-to-zero (compute-executor-capacity e)))
- "executeLatency" (float-str (:execute-latencies stats))
- "executed" (nil-to-zero (:executed stats))
- "processLatency" (float-str (:process-latencies stats))
- "acked" (nil-to-zero (:acked stats))
- "failed" (nil-to-zero (:failed stats))
- "workerLogLink" (worker-log-link (.get_host e) (.get_port e)
topology-id)}))
-
-(defn bolt-stats
- [window ^TopologyInfo topology-info component executors include-sys?]
- (let [window-hint (str " (" (window-hint window) ")")
- stats (get-filled-stats executors)
- stream-summary (-> stats (aggregate-bolt-stats include-sys?))
- summary (-> stream-summary aggregate-bolt-streams)]
- {"boltStats" (bolt-summary (.get_id topology-info) component summary
window)
- "inputStats" (bolt-input-stats stream-summary window)
- "outputStats" (bolt-output-stats stream-summary window)
- "executorStats" (bolt-executor-stats
- (.get_id topology-info) executors window
include-sys?)}))
+(defmulti unpack-comp-agg-stat
+ (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
+
+(defmethod unpack-comp-agg-stat ComponentType/BOLT
+ [[window ^ComponentAggregateStats s]]
+ (let [^CommonAggregateStats comm-s (.get_common_stats s)
+ ^SpecificAggregateStats spec-s (.get_specific_stats s)
+ ^BoltAggregateStats bolt-s (.get_bolt spec-s)]
+ {"window" window
+ "windowPretty" (window-hint window)
+ "emitted" (.get_emitted comm-s)
+ "transferred" (.get_transferred comm-s)
+ "acked" (.get_acked comm-s)
+ "failed" (.get_failed comm-s)
+ "executeLatency" (float-str (.get_execute_latency_ms bolt-s))
+ "processLatency" (float-str (.get_process_latency_ms bolt-s))
+ "executed" (.get_executed bolt-s)
+ "capacity" (float-str (.get_capacity bolt-s))}))
+
+(defmethod unpack-comp-agg-stat ComponentType/SPOUT
+ [[window ^ComponentAggregateStats s]]
+ (let [^CommonAggregateStats comm-s (.get_common_stats s)
+ ^SpecificAggregateStats spec-s (.get_specific_stats s)
+ ^SpoutAggregateStats spout-s (.get_spout spec-s)]
+ {"window" window
+ "windowPretty" (window-hint window)
+ "emitted" (.get_emitted comm-s)
+ "transferred" (.get_transferred comm-s)
+ "acked" (.get_acked comm-s)
+ "failed" (.get_failed comm-s)
+ "completeLatency" (float-str (.get_complete_latency_ms spout-s))}))
+
+(defn- unpack-bolt-input-stat
+ [[^GlobalStreamId s ^ComponentAggregateStats stats]]
+ (let [^SpecificAggregateStats sas (.get_specific_stats stats)
+ ^BoltAggregateStats bas (.get_bolt sas)
+ ^CommonAggregateStats cas (.get_common_stats stats)
+ comp-id (.get_componentId s)]
+ {"component" comp-id
+ "encodedComponentId" (url-encode comp-id)
+ "stream" (.get_streamId s)
+ "executeLatency" (float-str (.get_execute_latency_ms bas))
+ "processLatency" (float-str (.get_process_latency_ms bas))
+ "executed" (nil-to-zero (.get_executed bas))
+ "acked" (nil-to-zero (.get_acked cas))
+ "failed" (nil-to-zero (.get_failed cas))}))
+
+(defmulti unpack-comp-output-stat
+ (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
+
+(defmethod unpack-comp-output-stat ComponentType/BOLT
+ [[stream-id ^ComponentAggregateStats stats]]
+ (let [^CommonAggregateStats cas (.get_common_stats stats)]
+ {"stream" stream-id
+ "emitted" (nil-to-zero (.get_emitted cas))
+ "transferred" (nil-to-zero (.get_transferred cas))}))
+
+(defmethod unpack-comp-output-stat ComponentType/SPOUT
+ [[stream-id ^ComponentAggregateStats stats]]
+ (let [^CommonAggregateStats cas (.get_common_stats stats)
+ ^SpecificAggregateStats spec-s (.get_specific_stats stats)
+ ^SpoutAggregateStats spout-s (.get_spout spec-s)]
+ {"stream" stream-id
+ "emitted" (nil-to-zero (.get_emitted cas))
+ "transferred" (nil-to-zero (.get_transferred cas))
+ "completeLatency" (float-str (.get_complete_latency_ms spout-s))
+ "acked" (nil-to-zero (.get_acked cas))
+ "failed" (nil-to-zero (.get_failed cas))}))
+
+(defmulti unpack-comp-exec-stat
+ (fn [_ ^ComponentAggregateStats cas] (.get_type (.get_stats
^ExecutorAggregateStats cas))))
+
+(defmethod unpack-comp-exec-stat ComponentType/BOLT
+ [topology-id ^ExecutorAggregateStats eas]
+ (let [^ExecutorSummary summ (.get_exec_summary eas)
+ ^ExecutorInfo info (.get_executor_info summ)
+ ^ComponentAggregateStats stats (.get_stats eas)
+ ^SpecificAggregateStats ss (.get_specific_stats stats)
+ ^BoltAggregateStats bas (.get_bolt ss)
+ ^CommonAggregateStats cas (.get_common_stats stats)
+ host (.get_host summ)
+ port (.get_port summ)
+ exec-id (pretty-executor-info info)]
+ {"id" exec-id
+ "encodedId" (url-encode exec-id)
+ "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
+ "host" host
+ "port" port
+ "emitted" (nil-to-zero (.get_emitted cas))
+ "transferred" (nil-to-zero (.get_transferred cas))
+ "capacity" (float-str (nil-to-zero (.get_capacity bas)))
+ "executeLatency" (float-str (.get_execute_latency_ms bas))
+ "executed" (nil-to-zero (.get_executed bas))
+ "processLatency" (float-str (.get_process_latency_ms bas))
+ "acked" (nil-to-zero (.get_acked cas))
+ "failed" (nil-to-zero (.get_failed cas))
+ "workerLogLink" (worker-log-link host port topology-id)}))
+
+(defmethod unpack-comp-exec-stat ComponentType/SPOUT
+ [topology-id ^ExecutorAggregateStats eas]
+ (let [^ExecutorSummary summ (.get_exec_summary eas)
+ ^ExecutorInfo info (.get_executor_info summ)
+ ^ComponentAggregateStats stats (.get_stats eas)
+ ^SpecificAggregateStats ss (.get_specific_stats stats)
+ ^SpoutAggregateStats sas (.get_spout ss)
+ ^CommonAggregateStats cas (.get_common_stats stats)
+ host (.get_host summ)
+ port (.get_port summ)
+ exec-id (pretty-executor-info info)]
+ {"id" exec-id
+ "encodedId" (url-encode exec-id)
+ "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
+ "host" host
+ "port" port
+ "emitted" (nil-to-zero (.get_emitted cas))
+ "transferred" (nil-to-zero (.get_transferred cas))
+ "completeLatency" (float-str (.get_complete_latency_ms sas))
+ "acked" (nil-to-zero (.get_acked cas))
+ "failed" (nil-to-zero (.get_failed cas))
+ "workerLogLink" (worker-log-link host port topology-id)}))
+
+(defmulti unpack-component-page-info
+ "Unpacks component-specific info to clojure data structures"
+ (fn [^ComponentPageInfo info & _]
+ (.get_component_type info)))
+
+(defmethod unpack-component-page-info ComponentType/BOLT
+ [^ComponentPageInfo info topology-id window include-sys?]
+ (merge
+ {"boltStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
+ "inputStats" (map unpack-bolt-input-stat (.get_gsid_to_input_stats
info))
+ "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats
info))
+ "executorStats" (map (partial unpack-comp-exec-stat topology-id)
+ (.get_exec_stats info))}
+ (-> info .get_errors (component-errors topology-id))))
+
+(defmethod unpack-component-page-info ComponentType/SPOUT
+ [^ComponentPageInfo info topology-id window include-sys?]
+ (merge
+ {"spoutStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
--- End diff --
Is this a backwards incompatible change in the Restful web service? I see
that the mustache tags changed. I'm not sure we want to do that if someone is
relying on this, even if it was not named the best before.
> UI Topology & Component Pages have long load times with large,
> highly-connected Topologies
> ------------------------------------------------------------------------------------------
>
> Key: STORM-820
> URL: https://issues.apache.org/jira/browse/STORM-820
> Project: Apache Storm
> Issue Type: Improvement
> Affects Versions: 0.11.0
> Reporter: Derek Dagit
> Assignee: Derek Dagit
>
> In the UI, the Topology Page and the Component Page each make a
> getTopologyInfoWithOpts thrift call to nimbus for executor heartbeat data.
> Metrics from this data are then aggregated in by the UI daemon for display.
> When large topologies, with high-connectedness, are viewed in this way, the
> load times for each page can be minutes long. In addition, heap usage by the
> nimbus JVM can grow substantially as data for each executor, component, &
> stream is serialized to be sent to the UI.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)