[ 
https://issues.apache.org/jira/browse/STORM-820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14556570#comment-14556570
 ] 

ASF GitHub Bot commented on STORM-820:
--------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30919755
  
    --- Diff: storm-core/src/clj/backtype/storm/ui/core.clj ---
    @@ -783,128 +504,180 @@
             "errorLapsedSecs" (get-error-time e)
             "error" (.get_error e)})}))
     
    -(defn spout-stats
    -  [window ^TopologyInfo topology-info component executors include-sys?]
    -  (let [window-hint (str " (" (window-hint window) ")")
    -        stats (get-filled-stats executors)
    -        stream-summary (-> stats (aggregate-spout-stats include-sys?))
    -        summary (-> stream-summary aggregate-spout-streams)]
    -    {"spoutSummary" (spout-summary-json
    -                      (.get_id topology-info) component summary window)
    -     "outputStats" (spout-output-stats stream-summary window)
    -     "executorStats" (spout-executor-stats (.get_id topology-info)
    -                                           executors window 
include-sys?)}))
    -
    -(defn bolt-summary
    -  [topology-id id stats window]
    -  (let [times (stats-times (:emitted stats))
    -        display-map (into {} (for [t times] [t pretty-uptime-sec]))
    -        display-map (assoc display-map ":all-time" (fn [_] "All time"))]
    -    (for [k (concat times [":all-time"])
    -          :let [disp ((display-map k) k)]]
    -      {"window" k
    -       "windowPretty" disp
    -       "emitted" (get-in stats [:emitted k])
    -       "transferred" (get-in stats [:transferred k])
    -       "executeLatency" (float-str (get-in stats [:execute-latencies k]))
    -       "executed" (get-in stats [:executed k])
    -       "processLatency" (float-str (get-in stats [:process-latencies k]))
    -       "acked" (get-in stats [:acked k])
    -       "failed" (get-in stats [:failed k])})))
    -
    -(defn bolt-output-stats
    -  [stream-summary window]
    -  (let [stream-summary (-> stream-summary
    -                           swap-map-order
    -                           (get window)
    -                           (select-keys [:emitted :transferred])
    -                           swap-map-order)]
    -    (for [[s stats] stream-summary]
    -      {"stream" s
    -        "emitted" (nil-to-zero (:emitted stats))
    -        "transferred" (nil-to-zero (:transferred stats))})))
    -
    -(defn bolt-input-stats
    -  [stream-summary window]
    -  (let [stream-summary
    -        (-> stream-summary
    -            swap-map-order
    -            (get window)
    -            (select-keys [:acked :failed :process-latencies
    -                          :executed :execute-latencies])
    -            swap-map-order)]
    -    (for [[^GlobalStreamId s stats] stream-summary]
    -      {"component" (.get_componentId s)
    -       "encodedComponent" (url-encode (.get_componentId s))
    -       "stream" (.get_streamId s)
    -       "executeLatency" (float-str (:execute-latencies stats))
    -       "processLatency" (float-str (:process-latencies stats))
    -       "executed" (nil-to-zero (:executed stats))
    -       "acked" (nil-to-zero (:acked stats))
    -       "failed" (nil-to-zero (:failed stats))})))
    -
    -(defn bolt-executor-stats
    -  [topology-id executors window include-sys?]
    -  (for [^ExecutorSummary e executors
    -        :let [stats (.get_stats e)
    -              stats (if stats
    -                      (-> stats
    -                          (aggregate-bolt-stats include-sys?)
    -                          (aggregate-bolt-streams)
    -                          swap-map-order
    -                          (get window)))]]
    -    {"id" (pretty-executor-info (.get_executor_info e))
    -     "encodedId" (url-encode (pretty-executor-info (.get_executor_info e)))
    -     "uptime" (pretty-uptime-sec (.get_uptime_secs e))
    -     "host" (.get_host e)
    -     "port" (.get_port e)
    -     "emitted" (nil-to-zero (:emitted stats))
    -     "transferred" (nil-to-zero (:transferred stats))
    -     "capacity" (float-str (nil-to-zero (compute-executor-capacity e)))
    -     "executeLatency" (float-str (:execute-latencies stats))
    -     "executed" (nil-to-zero (:executed stats))
    -     "processLatency" (float-str (:process-latencies stats))
    -     "acked" (nil-to-zero (:acked stats))
    -     "failed" (nil-to-zero (:failed stats))
    -     "workerLogLink" (worker-log-link (.get_host e) (.get_port e) 
topology-id)}))
    -
    -(defn bolt-stats
    -  [window ^TopologyInfo topology-info component executors include-sys?]
    -  (let [window-hint (str " (" (window-hint window) ")")
    -        stats (get-filled-stats executors)
    -        stream-summary (-> stats (aggregate-bolt-stats include-sys?))
    -        summary (-> stream-summary aggregate-bolt-streams)]
    -    {"boltStats" (bolt-summary (.get_id topology-info) component summary 
window)
    -     "inputStats" (bolt-input-stats stream-summary window)
    -     "outputStats" (bolt-output-stats stream-summary window)
    -     "executorStats" (bolt-executor-stats
    -                       (.get_id topology-info) executors window 
include-sys?)}))
    +(defmulti unpack-comp-agg-stat
    +  (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
    +
    +(defmethod unpack-comp-agg-stat ComponentType/BOLT
    +  [[window ^ComponentAggregateStats s]]
    +  (let [^CommonAggregateStats comm-s (.get_common_stats s)
    +        ^SpecificAggregateStats spec-s (.get_specific_stats s)
    +        ^BoltAggregateStats bolt-s (.get_bolt spec-s)]
    +    {"window" window
    +     "windowPretty" (window-hint window)
    +     "emitted" (.get_emitted comm-s)
    +     "transferred" (.get_transferred comm-s)
    +     "acked" (.get_acked comm-s)
    +     "failed" (.get_failed comm-s)
    +     "executeLatency" (float-str (.get_execute_latency_ms bolt-s))
    +     "processLatency"  (float-str (.get_process_latency_ms bolt-s))
    +     "executed" (.get_executed bolt-s)
    +     "capacity" (float-str (.get_capacity bolt-s))}))
    +
    +(defmethod unpack-comp-agg-stat ComponentType/SPOUT
    +  [[window ^ComponentAggregateStats s]]
    +  (let [^CommonAggregateStats comm-s (.get_common_stats s)
    +        ^SpecificAggregateStats spec-s (.get_specific_stats s)
    +        ^SpoutAggregateStats spout-s (.get_spout spec-s)]
    +    {"window" window
    +     "windowPretty" (window-hint window)
    +     "emitted" (.get_emitted comm-s)
    +     "transferred" (.get_transferred comm-s)
    +     "acked" (.get_acked comm-s)
    +     "failed" (.get_failed comm-s)
    +     "completeLatency" (float-str (.get_complete_latency_ms spout-s))}))
    +
    +(defn- unpack-bolt-input-stat
    +  [[^GlobalStreamId s ^ComponentAggregateStats stats]]
    +  (let [^SpecificAggregateStats sas (.get_specific_stats stats)
    +        ^BoltAggregateStats bas (.get_bolt sas)
    +        ^CommonAggregateStats cas (.get_common_stats stats)
    +        comp-id (.get_componentId s)]
    +    {"component" comp-id
    +     "encodedComponentId" (url-encode comp-id)
    +     "stream" (.get_streamId s)
    +     "executeLatency" (float-str (.get_execute_latency_ms bas))
    +     "processLatency" (float-str (.get_process_latency_ms bas))
    +     "executed" (nil-to-zero (.get_executed bas))
    +     "acked" (nil-to-zero (.get_acked cas))
    +     "failed" (nil-to-zero (.get_failed cas))}))
    +
    +(defmulti unpack-comp-output-stat
    +  (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
    +
    +(defmethod unpack-comp-output-stat ComponentType/BOLT
    +  [[stream-id ^ComponentAggregateStats stats]]
    +  (let [^CommonAggregateStats cas (.get_common_stats stats)]
    +    {"stream" stream-id
    +     "emitted" (nil-to-zero (.get_emitted cas))
    +     "transferred" (nil-to-zero (.get_transferred cas))}))
    +
    +(defmethod unpack-comp-output-stat ComponentType/SPOUT
    +  [[stream-id ^ComponentAggregateStats stats]]
    +  (let [^CommonAggregateStats cas (.get_common_stats stats)
    +        ^SpecificAggregateStats spec-s (.get_specific_stats stats)
    +        ^SpoutAggregateStats spout-s (.get_spout spec-s)]
    +    {"stream" stream-id
    +     "emitted" (nil-to-zero (.get_emitted cas))
    +     "transferred" (nil-to-zero (.get_transferred cas))
    +     "completeLatency" (float-str (.get_complete_latency_ms spout-s))
    +     "acked" (nil-to-zero (.get_acked cas))
    +     "failed" (nil-to-zero (.get_failed cas))}))
    +
    +(defmulti unpack-comp-exec-stat
    +  (fn [_ ^ComponentAggregateStats cas] (.get_type (.get_stats 
^ExecutorAggregateStats cas))))
    +
    +(defmethod unpack-comp-exec-stat ComponentType/BOLT
    +  [topology-id ^ExecutorAggregateStats eas]
    +  (let [^ExecutorSummary summ (.get_exec_summary eas)
    +        ^ExecutorInfo info (.get_executor_info summ)
    +        ^ComponentAggregateStats stats (.get_stats eas)
    +        ^SpecificAggregateStats ss (.get_specific_stats stats)
    +        ^BoltAggregateStats bas (.get_bolt ss)
    +        ^CommonAggregateStats cas (.get_common_stats stats)
    +        host (.get_host summ)
    +        port (.get_port summ)
    +        exec-id (pretty-executor-info info)]
    +    {"id" exec-id
    +     "encodedId" (url-encode exec-id)
    +     "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
    +     "host" host
    +     "port" port
    +     "emitted" (nil-to-zero (.get_emitted cas))
    +     "transferred" (nil-to-zero (.get_transferred cas))
    +     "capacity" (float-str (nil-to-zero (.get_capacity bas)))
    +     "executeLatency" (float-str (.get_execute_latency_ms bas))
    +     "executed" (nil-to-zero (.get_executed bas))
    +     "processLatency" (float-str (.get_process_latency_ms bas))
    +     "acked" (nil-to-zero (.get_acked cas))
    +     "failed" (nil-to-zero (.get_failed cas))
    +     "workerLogLink" (worker-log-link host port topology-id)}))
    +
    +(defmethod unpack-comp-exec-stat ComponentType/SPOUT
    +  [topology-id ^ExecutorAggregateStats eas]
    +  (let [^ExecutorSummary summ (.get_exec_summary eas)
    +        ^ExecutorInfo info (.get_executor_info summ)
    +        ^ComponentAggregateStats stats (.get_stats eas)
    +        ^SpecificAggregateStats ss (.get_specific_stats stats)
    +        ^SpoutAggregateStats sas (.get_spout ss)
    +        ^CommonAggregateStats cas (.get_common_stats stats)
    +        host (.get_host summ)
    +        port (.get_port summ)
    +        exec-id (pretty-executor-info info)]
    +    {"id" exec-id
    +     "encodedId" (url-encode exec-id)
    +     "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
    +     "host" host
    +     "port" port
    +     "emitted" (nil-to-zero (.get_emitted cas))
    +     "transferred" (nil-to-zero (.get_transferred cas))
    +     "completeLatency" (float-str (.get_complete_latency_ms sas))
    +     "acked" (nil-to-zero (.get_acked cas))
    +     "failed" (nil-to-zero (.get_failed cas))
    +     "workerLogLink" (worker-log-link host port topology-id)}))
    +
    +(defmulti unpack-component-page-info
    +  "Unpacks component-specific info to clojure data structures"
    +  (fn [^ComponentPageInfo info & _]
    +    (.get_component_type info)))
    +
    +(defmethod unpack-component-page-info ComponentType/BOLT
    +  [^ComponentPageInfo info topology-id window include-sys?]
    +  (merge
    +    {"boltStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
    +     "inputStats" (map unpack-bolt-input-stat (.get_gsid_to_input_stats 
info))
    +     "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats 
info))
    +     "executorStats" (map (partial unpack-comp-exec-stat topology-id)
    +                          (.get_exec_stats info))}
    +    (-> info .get_errors (component-errors topology-id))))
    +
    +(defmethod unpack-component-page-info ComponentType/SPOUT
    +  [^ComponentPageInfo info topology-id window include-sys?]
    +  (merge
    +    {"spoutStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
    --- End diff --
    
    Is this a backwards incompatible change in the Restful web service?  I see 
that the mustache tags changed.  I'm not sure we want to do that if someone is 
relying on this, even if it was not named the best before.


> UI Topology & Component Pages have long load times with large, 
> highly-connected Topologies
> ------------------------------------------------------------------------------------------
>
>                 Key: STORM-820
>                 URL: https://issues.apache.org/jira/browse/STORM-820
>             Project: Apache Storm
>          Issue Type: Improvement
>    Affects Versions: 0.11.0
>            Reporter: Derek Dagit
>            Assignee: Derek Dagit
>
> In the UI, the Topology Page and the Component Page each make a 
> getTopologyInfoWithOpts thrift call to nimbus for executor heartbeat data. 
> Metrics from this data are then aggregated in by the UI daemon for display.
> When large topologies, with high-connectedness, are viewed in this way, the 
> load times for each page can be minutes long.  In addition, heap usage by the 
> nimbus JVM can grow substantially as data for each executor, component, & 
> stream is serialized to be sent to the UI.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to