Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1352#discussion_r65499343
  
    --- Diff: storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---
    @@ -1355,48 +1366,843 @@
     (defmethod blob-sync :local [conf nimbus]
       nil)
     
    -(defserverfn service-handler [conf inimbus]
    -  (.prepare inimbus conf (master-inimbus-dir conf))
    -  (log-message "Starting Nimbus with conf " conf)
    -  (let [nimbus (nimbus-data conf inimbus)
    -        blob-store (:blob-store nimbus)
    -        principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
    +(defn extract-cluster-metrics [^ClusterSummary summ]
    +  ; FIXME: this is nearly same to what ui/core have... should we extract 
it to another location?
    +  (let [sups (.get_supervisors summ)
    +        used-slots (reduce + (map #(.get_num_used_workers 
^SupervisorSummary %) sups))
    +        total-slots (reduce + (map #(.get_num_workers ^SupervisorSummary 
%) sups))
    +        free-slots (- total-slots used-slots)
    +        topologies (.get_topologies_size summ)
    +        total-tasks (->> (.get_topologies summ)
    +                         (map #(.get_num_tasks ^TopologySummary %))
    +                         (reduce +))
    +        total-executors (->> (.get_topologies summ)
    +                             (map #(.get_num_executors ^TopologySummary %))
    +                             (reduce +))]
    +    {:cluster-info (IClusterMetricsConsumer$ClusterInfo. 
(System/currentTimeMillis))
    +     :data-points  (map
    +                     (fn [[k v]] (DataPoint. k v))
    +                     { "supervisors" (count sups)
    +                       "topologies" topologies
    +                       "slotsTotal" total-slots
    +                       "slotsUsed" used-slots
    +                       "slotsFree" free-slots
    +                       "executorsTotal" total-executors
    +                       "tasksTotal" total-tasks}
    +                     )}))
    +
    +; TODO: should we move this to another location?
    +(defn extract-supervisors-metrics [^ClusterSummary summ]
    +  (let [sups (.get_supervisors summ)]
    +    (map (fn [^SupervisorSummary sup]
    +           {:supervisor-info (IClusterMetricsConsumer$SupervisorInfo.
    +                               (.get_host sup)
    +                               (.get_supervisor_id sup)
    +                               (System/currentTimeMillis))
    +            ; FIXME: this is similar to what ui/core have... should we 
extract it to another location?
    +            :data-points     (map
    +                               (fn [[k v]] (DataPoint. k v))
    +                               {"slotsTotal" (.get_num_workers sup)
    +                                "slotsUsed"  (.get_num_used_workers sup)
    +                                "totalMem"   (get (.get_total_resources 
sup) Config/SUPERVISOR_MEMORY_CAPACITY_MB)
    +                                "totalCpu"   (get (.get_total_resources 
sup) Config/SUPERVISOR_CPU_CAPACITY)
    +                                "usedMem"    (.get_used_mem sup)
    +                                "usedCpu"    (.get_used_cpu sup)})
    +            })
    +         sups)))
    +
    +(defn send-cluster-metrics-to-executors [nimbus-service nimbus]
    +  (let [cluster-summary (.getClusterInfo nimbus-service)
    +        cluster-metrics (extract-cluster-metrics cluster-summary)
    +        supervisors-metrics (extract-supervisors-metrics cluster-summary)]
    +    (dofor
    +      [consumer-executor (:cluster-consumer-executors nimbus)]
    +      (do
    +        (.handleDataPoints consumer-executor (:cluster-info 
cluster-metrics) (:data-points cluster-metrics))
    +        (dofor
    +          [supervisor-metrics supervisors-metrics]
    +          (do
    +          (log-message (:supervisor-info supervisor-metrics) " / " 
(:data-points supervisor-metrics))
    +          (.handleDataPoints consumer-executor (:supervisor-info 
supervisor-metrics) (:data-points supervisor-metrics))))))))
    +
    +(defn mk-reified-nimbus [nimbus conf blob-store]
    +  (let [principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
             admin-users (or (.get conf NIMBUS-ADMINS) [])
             get-common-topo-info
    -          (fn [^String storm-id operation]
    -            (let [storm-cluster-state (:storm-cluster-state nimbus)
    -                  topology-conf (try-read-storm-conf conf storm-id 
blob-store)
    -                  storm-name (topology-conf TOPOLOGY-NAME)
    -                  _ (check-authorization! nimbus
    -                                          storm-name
    -                                          topology-conf
    -                                          operation)
    -                  topology (try-read-storm-topology storm-id blob-store)
    -                  task->component (storm-task-info topology topology-conf)
    -                  base (.storm-base storm-cluster-state storm-id nil)
    -                  launch-time-secs (if base (:launch-time-secs base)
    -                                     (throw
    -                                       (NotAliveException. (str 
storm-id))))
    -                  assignment (.assignment-info storm-cluster-state 
storm-id nil)
    -                  beats (map-val :heartbeat (get @(:heartbeats-cache 
nimbus)
    -                                                 storm-id))
    -                  all-components (set (vals task->component))]
    -              {:storm-name storm-name
    -               :storm-cluster-state storm-cluster-state
    -               :all-components all-components
    -               :launch-time-secs launch-time-secs
    -               :assignment assignment
    -               :beats beats
    -               :topology topology
    -               :task->component task->component
    -               :base base}))
    +        (fn [^String storm-id operation]
    --- End diff --
    
    it will be great if we can preserve previous spaces looks like this diff 
showing up unnecessarily 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to