[ 
https://issues.apache.org/jira/browse/STORM-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15311916#comment-15311916
 ] 

ASF GitHub Bot commented on STORM-1723:
---------------------------------------

Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1352#discussion_r65499343
  
    --- Diff: storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---
    @@ -1355,48 +1366,843 @@
     (defmethod blob-sync :local [conf nimbus]
       nil)
     
    -(defserverfn service-handler [conf inimbus]
    -  (.prepare inimbus conf (master-inimbus-dir conf))
    -  (log-message "Starting Nimbus with conf " conf)
    -  (let [nimbus (nimbus-data conf inimbus)
    -        blob-store (:blob-store nimbus)
    -        principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
    +(defn extract-cluster-metrics [^ClusterSummary summ]
    +  ; FIXME: this is nearly same to what ui/core have... should we extract 
it to another location?
    +  (let [sups (.get_supervisors summ)
    +        used-slots (reduce + (map #(.get_num_used_workers 
^SupervisorSummary %) sups))
    +        total-slots (reduce + (map #(.get_num_workers ^SupervisorSummary 
%) sups))
    +        free-slots (- total-slots used-slots)
    +        topologies (.get_topologies_size summ)
    +        total-tasks (->> (.get_topologies summ)
    +                         (map #(.get_num_tasks ^TopologySummary %))
    +                         (reduce +))
    +        total-executors (->> (.get_topologies summ)
    +                             (map #(.get_num_executors ^TopologySummary %))
    +                             (reduce +))]
    +    {:cluster-info (IClusterMetricsConsumer$ClusterInfo. 
(System/currentTimeMillis))
    +     :data-points  (map
    +                     (fn [[k v]] (DataPoint. k v))
    +                     { "supervisors" (count sups)
    +                       "topologies" topologies
    +                       "slotsTotal" total-slots
    +                       "slotsUsed" used-slots
    +                       "slotsFree" free-slots
    +                       "executorsTotal" total-executors
    +                       "tasksTotal" total-tasks}
    +                     )}))
    +
    +; TODO: should we move this to another location?
    +(defn extract-supervisors-metrics [^ClusterSummary summ]
    +  (let [sups (.get_supervisors summ)]
    +    (map (fn [^SupervisorSummary sup]
    +           {:supervisor-info (IClusterMetricsConsumer$SupervisorInfo.
    +                               (.get_host sup)
    +                               (.get_supervisor_id sup)
    +                               (System/currentTimeMillis))
    +            ; FIXME: this is similar to what ui/core have... should we 
extract it to another location?
    +            :data-points     (map
    +                               (fn [[k v]] (DataPoint. k v))
    +                               {"slotsTotal" (.get_num_workers sup)
    +                                "slotsUsed"  (.get_num_used_workers sup)
    +                                "totalMem"   (get (.get_total_resources 
sup) Config/SUPERVISOR_MEMORY_CAPACITY_MB)
    +                                "totalCpu"   (get (.get_total_resources 
sup) Config/SUPERVISOR_CPU_CAPACITY)
    +                                "usedMem"    (.get_used_mem sup)
    +                                "usedCpu"    (.get_used_cpu sup)})
    +            })
    +         sups)))
    +
    +(defn send-cluster-metrics-to-executors [nimbus-service nimbus]
    +  (let [cluster-summary (.getClusterInfo nimbus-service)
    +        cluster-metrics (extract-cluster-metrics cluster-summary)
    +        supervisors-metrics (extract-supervisors-metrics cluster-summary)]
    +    (dofor
    +      [consumer-executor (:cluster-consumer-executors nimbus)]
    +      (do
    +        (.handleDataPoints consumer-executor (:cluster-info 
cluster-metrics) (:data-points cluster-metrics))
    +        (dofor
    +          [supervisor-metrics supervisors-metrics]
    +          (do
    +          (log-message (:supervisor-info supervisor-metrics) " / " 
(:data-points supervisor-metrics))
    +          (.handleDataPoints consumer-executor (:supervisor-info 
supervisor-metrics) (:data-points supervisor-metrics))))))))
    +
    +(defn mk-reified-nimbus [nimbus conf blob-store]
    +  (let [principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
             admin-users (or (.get conf NIMBUS-ADMINS) [])
             get-common-topo-info
    -          (fn [^String storm-id operation]
    -            (let [storm-cluster-state (:storm-cluster-state nimbus)
    -                  topology-conf (try-read-storm-conf conf storm-id 
blob-store)
    -                  storm-name (topology-conf TOPOLOGY-NAME)
    -                  _ (check-authorization! nimbus
    -                                          storm-name
    -                                          topology-conf
    -                                          operation)
    -                  topology (try-read-storm-topology storm-id blob-store)
    -                  task->component (storm-task-info topology topology-conf)
    -                  base (.storm-base storm-cluster-state storm-id nil)
    -                  launch-time-secs (if base (:launch-time-secs base)
    -                                     (throw
    -                                       (NotAliveException. (str 
storm-id))))
    -                  assignment (.assignment-info storm-cluster-state 
storm-id nil)
    -                  beats (map-val :heartbeat (get @(:heartbeats-cache 
nimbus)
    -                                                 storm-id))
    -                  all-components (set (vals task->component))]
    -              {:storm-name storm-name
    -               :storm-cluster-state storm-cluster-state
    -               :all-components all-components
    -               :launch-time-secs launch-time-secs
    -               :assignment assignment
    -               :beats beats
    -               :topology topology
    -               :task->component task->component
    -               :base base}))
    +        (fn [^String storm-id operation]
    --- End diff --
    
    it will be great if we can preserve previous spaces looks like this diff 
showing up unnecessarily 


> Introduce ClusterMetricsConsumer
> --------------------------------
>
>                 Key: STORM-1723
>                 URL: https://issues.apache.org/jira/browse/STORM-1723
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>    Affects Versions: 2.0.0, 1.0.1
>            Reporter: Jungtaek Lim
>            Assignee: Jungtaek Lim
>
> NOTE: This issue is already discussed shortly. Please refer 
> [here|http://mail-archives.apache.org/mod_mbox/storm-dev/201604.mbox/%3CCAF5108hDCcMKxLXKUYLReOoKkNNdgW2YudweR+mKr=1hlsl...@mail.gmail.com%3E]
>  for details.
> This issue focuses to introduce ClusterMetricsConsumer and provide interface 
> to let users plugin their consumers.
> ClusterMetricsConsumers will be attached to Nimbus, and leader of Nimbus will 
> push cluster related metrics to ClusterMetricsConsumer.
> Requirements of ClusterMetricsConsumer are here:
> - Only leader of Nimbus should publish cluster metrics to consumer.
> - Nimbus shouldn't be affected by crashing or heavy latency on consumer.
> - Consumer should have resilient when crashing or Nimbus should take care of.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to