Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/554#discussion_r30921890
  
    --- Diff: storm-core/src/clj/backtype/storm/ui/core.clj ---
    @@ -783,128 +504,180 @@
             "errorLapsedSecs" (get-error-time e)
             "error" (.get_error e)})}))
     
    -(defn spout-stats
    -  [window ^TopologyInfo topology-info component executors include-sys?]
    -  (let [window-hint (str " (" (window-hint window) ")")
    -        stats (get-filled-stats executors)
    -        stream-summary (-> stats (aggregate-spout-stats include-sys?))
    -        summary (-> stream-summary aggregate-spout-streams)]
    -    {"spoutSummary" (spout-summary-json
    -                      (.get_id topology-info) component summary window)
    -     "outputStats" (spout-output-stats stream-summary window)
    -     "executorStats" (spout-executor-stats (.get_id topology-info)
    -                                           executors window 
include-sys?)}))
    -
    -(defn bolt-summary
    -  [topology-id id stats window]
    -  (let [times (stats-times (:emitted stats))
    -        display-map (into {} (for [t times] [t pretty-uptime-sec]))
    -        display-map (assoc display-map ":all-time" (fn [_] "All time"))]
    -    (for [k (concat times [":all-time"])
    -          :let [disp ((display-map k) k)]]
    -      {"window" k
    -       "windowPretty" disp
    -       "emitted" (get-in stats [:emitted k])
    -       "transferred" (get-in stats [:transferred k])
    -       "executeLatency" (float-str (get-in stats [:execute-latencies k]))
    -       "executed" (get-in stats [:executed k])
    -       "processLatency" (float-str (get-in stats [:process-latencies k]))
    -       "acked" (get-in stats [:acked k])
    -       "failed" (get-in stats [:failed k])})))
    -
    -(defn bolt-output-stats
    -  [stream-summary window]
    -  (let [stream-summary (-> stream-summary
    -                           swap-map-order
    -                           (get window)
    -                           (select-keys [:emitted :transferred])
    -                           swap-map-order)]
    -    (for [[s stats] stream-summary]
    -      {"stream" s
    -        "emitted" (nil-to-zero (:emitted stats))
    -        "transferred" (nil-to-zero (:transferred stats))})))
    -
    -(defn bolt-input-stats
    -  [stream-summary window]
    -  (let [stream-summary
    -        (-> stream-summary
    -            swap-map-order
    -            (get window)
    -            (select-keys [:acked :failed :process-latencies
    -                          :executed :execute-latencies])
    -            swap-map-order)]
    -    (for [[^GlobalStreamId s stats] stream-summary]
    -      {"component" (.get_componentId s)
    -       "encodedComponent" (url-encode (.get_componentId s))
    -       "stream" (.get_streamId s)
    -       "executeLatency" (float-str (:execute-latencies stats))
    -       "processLatency" (float-str (:process-latencies stats))
    -       "executed" (nil-to-zero (:executed stats))
    -       "acked" (nil-to-zero (:acked stats))
    -       "failed" (nil-to-zero (:failed stats))})))
    -
    -(defn bolt-executor-stats
    -  [topology-id executors window include-sys?]
    -  (for [^ExecutorSummary e executors
    -        :let [stats (.get_stats e)
    -              stats (if stats
    -                      (-> stats
    -                          (aggregate-bolt-stats include-sys?)
    -                          (aggregate-bolt-streams)
    -                          swap-map-order
    -                          (get window)))]]
    -    {"id" (pretty-executor-info (.get_executor_info e))
    -     "encodedId" (url-encode (pretty-executor-info (.get_executor_info e)))
    -     "uptime" (pretty-uptime-sec (.get_uptime_secs e))
    -     "host" (.get_host e)
    -     "port" (.get_port e)
    -     "emitted" (nil-to-zero (:emitted stats))
    -     "transferred" (nil-to-zero (:transferred stats))
    -     "capacity" (float-str (nil-to-zero (compute-executor-capacity e)))
    -     "executeLatency" (float-str (:execute-latencies stats))
    -     "executed" (nil-to-zero (:executed stats))
    -     "processLatency" (float-str (:process-latencies stats))
    -     "acked" (nil-to-zero (:acked stats))
    -     "failed" (nil-to-zero (:failed stats))
    -     "workerLogLink" (worker-log-link (.get_host e) (.get_port e) 
topology-id)}))
    -
    -(defn bolt-stats
    -  [window ^TopologyInfo topology-info component executors include-sys?]
    -  (let [window-hint (str " (" (window-hint window) ")")
    -        stats (get-filled-stats executors)
    -        stream-summary (-> stats (aggregate-bolt-stats include-sys?))
    -        summary (-> stream-summary aggregate-bolt-streams)]
    -    {"boltStats" (bolt-summary (.get_id topology-info) component summary 
window)
    -     "inputStats" (bolt-input-stats stream-summary window)
    -     "outputStats" (bolt-output-stats stream-summary window)
    -     "executorStats" (bolt-executor-stats
    -                       (.get_id topology-info) executors window 
include-sys?)}))
    +(defmulti unpack-comp-agg-stat
    +  (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
    +
    +(defmethod unpack-comp-agg-stat ComponentType/BOLT
    +  [[window ^ComponentAggregateStats s]]
    +  (let [^CommonAggregateStats comm-s (.get_common_stats s)
    +        ^SpecificAggregateStats spec-s (.get_specific_stats s)
    +        ^BoltAggregateStats bolt-s (.get_bolt spec-s)]
    +    {"window" window
    +     "windowPretty" (window-hint window)
    +     "emitted" (.get_emitted comm-s)
    +     "transferred" (.get_transferred comm-s)
    +     "acked" (.get_acked comm-s)
    +     "failed" (.get_failed comm-s)
    +     "executeLatency" (float-str (.get_execute_latency_ms bolt-s))
    +     "processLatency"  (float-str (.get_process_latency_ms bolt-s))
    +     "executed" (.get_executed bolt-s)
    +     "capacity" (float-str (.get_capacity bolt-s))}))
    +
    +(defmethod unpack-comp-agg-stat ComponentType/SPOUT
    +  [[window ^ComponentAggregateStats s]]
    +  (let [^CommonAggregateStats comm-s (.get_common_stats s)
    +        ^SpecificAggregateStats spec-s (.get_specific_stats s)
    +        ^SpoutAggregateStats spout-s (.get_spout spec-s)]
    +    {"window" window
    +     "windowPretty" (window-hint window)
    +     "emitted" (.get_emitted comm-s)
    +     "transferred" (.get_transferred comm-s)
    +     "acked" (.get_acked comm-s)
    +     "failed" (.get_failed comm-s)
    +     "completeLatency" (float-str (.get_complete_latency_ms spout-s))}))
    +
    +(defn- unpack-bolt-input-stat
    +  [[^GlobalStreamId s ^ComponentAggregateStats stats]]
    +  (let [^SpecificAggregateStats sas (.get_specific_stats stats)
    +        ^BoltAggregateStats bas (.get_bolt sas)
    +        ^CommonAggregateStats cas (.get_common_stats stats)
    +        comp-id (.get_componentId s)]
    +    {"component" comp-id
    +     "encodedComponentId" (url-encode comp-id)
    +     "stream" (.get_streamId s)
    +     "executeLatency" (float-str (.get_execute_latency_ms bas))
    +     "processLatency" (float-str (.get_process_latency_ms bas))
    +     "executed" (nil-to-zero (.get_executed bas))
    +     "acked" (nil-to-zero (.get_acked cas))
    +     "failed" (nil-to-zero (.get_failed cas))}))
    +
    +(defmulti unpack-comp-output-stat
    +  (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
    +
    +(defmethod unpack-comp-output-stat ComponentType/BOLT
    +  [[stream-id ^ComponentAggregateStats stats]]
    +  (let [^CommonAggregateStats cas (.get_common_stats stats)]
    +    {"stream" stream-id
    +     "emitted" (nil-to-zero (.get_emitted cas))
    +     "transferred" (nil-to-zero (.get_transferred cas))}))
    +
    +(defmethod unpack-comp-output-stat ComponentType/SPOUT
    +  [[stream-id ^ComponentAggregateStats stats]]
    +  (let [^CommonAggregateStats cas (.get_common_stats stats)
    +        ^SpecificAggregateStats spec-s (.get_specific_stats stats)
    +        ^SpoutAggregateStats spout-s (.get_spout spec-s)]
    +    {"stream" stream-id
    +     "emitted" (nil-to-zero (.get_emitted cas))
    +     "transferred" (nil-to-zero (.get_transferred cas))
    +     "completeLatency" (float-str (.get_complete_latency_ms spout-s))
    +     "acked" (nil-to-zero (.get_acked cas))
    +     "failed" (nil-to-zero (.get_failed cas))}))
    +
    +(defmulti unpack-comp-exec-stat
    +  (fn [_ ^ComponentAggregateStats cas] (.get_type (.get_stats 
^ExecutorAggregateStats cas))))
    +
    +(defmethod unpack-comp-exec-stat ComponentType/BOLT
    +  [topology-id ^ExecutorAggregateStats eas]
    +  (let [^ExecutorSummary summ (.get_exec_summary eas)
    +        ^ExecutorInfo info (.get_executor_info summ)
    +        ^ComponentAggregateStats stats (.get_stats eas)
    +        ^SpecificAggregateStats ss (.get_specific_stats stats)
    +        ^BoltAggregateStats bas (.get_bolt ss)
    +        ^CommonAggregateStats cas (.get_common_stats stats)
    +        host (.get_host summ)
    +        port (.get_port summ)
    +        exec-id (pretty-executor-info info)]
    +    {"id" exec-id
    +     "encodedId" (url-encode exec-id)
    +     "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
    +     "host" host
    +     "port" port
    +     "emitted" (nil-to-zero (.get_emitted cas))
    +     "transferred" (nil-to-zero (.get_transferred cas))
    +     "capacity" (float-str (nil-to-zero (.get_capacity bas)))
    +     "executeLatency" (float-str (.get_execute_latency_ms bas))
    +     "executed" (nil-to-zero (.get_executed bas))
    +     "processLatency" (float-str (.get_process_latency_ms bas))
    +     "acked" (nil-to-zero (.get_acked cas))
    +     "failed" (nil-to-zero (.get_failed cas))
    +     "workerLogLink" (worker-log-link host port topology-id)}))
    +
    +(defmethod unpack-comp-exec-stat ComponentType/SPOUT
    +  [topology-id ^ExecutorAggregateStats eas]
    +  (let [^ExecutorSummary summ (.get_exec_summary eas)
    +        ^ExecutorInfo info (.get_executor_info summ)
    +        ^ComponentAggregateStats stats (.get_stats eas)
    +        ^SpecificAggregateStats ss (.get_specific_stats stats)
    +        ^SpoutAggregateStats sas (.get_spout ss)
    +        ^CommonAggregateStats cas (.get_common_stats stats)
    +        host (.get_host summ)
    +        port (.get_port summ)
    +        exec-id (pretty-executor-info info)]
    +    {"id" exec-id
    +     "encodedId" (url-encode exec-id)
    +     "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
    +     "host" host
    +     "port" port
    +     "emitted" (nil-to-zero (.get_emitted cas))
    +     "transferred" (nil-to-zero (.get_transferred cas))
    +     "completeLatency" (float-str (.get_complete_latency_ms sas))
    +     "acked" (nil-to-zero (.get_acked cas))
    +     "failed" (nil-to-zero (.get_failed cas))
    +     "workerLogLink" (worker-log-link host port topology-id)}))
    +
    +(defmulti unpack-component-page-info
    +  "Unpacks component-specific info to clojure data structures"
    +  (fn [^ComponentPageInfo info & _]
    +    (.get_component_type info)))
    +
    +(defmethod unpack-component-page-info ComponentType/BOLT
    +  [^ComponentPageInfo info topology-id window include-sys?]
    +  (merge
    +    {"boltStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
    +     "inputStats" (map unpack-bolt-input-stat (.get_gsid_to_input_stats 
info))
    +     "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats 
info))
    +     "executorStats" (map (partial unpack-comp-exec-stat topology-id)
    +                          (.get_exec_stats info))}
    +    (-> info .get_errors (component-errors topology-id))))
    +
    +(defmethod unpack-component-page-info ComponentType/SPOUT
    +  [^ComponentPageInfo info topology-id window include-sys?]
    +  (merge
    +    {"spoutStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
    --- End diff --
    
    I had changed it because it was named inconsistently, and I also should 
have updated STORM-UI-REST-API.md.
    
    However since it is not essential, I'll revert to using spoutSummary so 
that the REST API does not change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to