[
https://issues.apache.org/jira/browse/STORM-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15311916#comment-15311916
]
ASF GitHub Bot commented on STORM-1723:
---------------------------------------
Github user harshach commented on a diff in the pull request:
https://github.com/apache/storm/pull/1352#discussion_r65499343
--- Diff: storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---
@@ -1355,48 +1366,843 @@
(defmethod blob-sync :local [conf nimbus]
nil)
-(defserverfn service-handler [conf inimbus]
- (.prepare inimbus conf (master-inimbus-dir conf))
- (log-message "Starting Nimbus with conf " conf)
- (let [nimbus (nimbus-data conf inimbus)
- blob-store (:blob-store nimbus)
- principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
+(defn extract-cluster-metrics [^ClusterSummary summ]
+ ; FIXME: this is nearly same to what ui/core have... should we extract
it to another location?
+ (let [sups (.get_supervisors summ)
+ used-slots (reduce + (map #(.get_num_used_workers
^SupervisorSummary %) sups))
+ total-slots (reduce + (map #(.get_num_workers ^SupervisorSummary
%) sups))
+ free-slots (- total-slots used-slots)
+ topologies (.get_topologies_size summ)
+ total-tasks (->> (.get_topologies summ)
+ (map #(.get_num_tasks ^TopologySummary %))
+ (reduce +))
+ total-executors (->> (.get_topologies summ)
+ (map #(.get_num_executors ^TopologySummary %))
+ (reduce +))]
+ {:cluster-info (IClusterMetricsConsumer$ClusterInfo.
(System/currentTimeMillis))
+ :data-points (map
+ (fn [[k v]] (DataPoint. k v))
+ { "supervisors" (count sups)
+ "topologies" topologies
+ "slotsTotal" total-slots
+ "slotsUsed" used-slots
+ "slotsFree" free-slots
+ "executorsTotal" total-executors
+ "tasksTotal" total-tasks}
+ )}))
+
+; TODO: should we move this to another location?
+(defn extract-supervisors-metrics [^ClusterSummary summ]
+ (let [sups (.get_supervisors summ)]
+ (map (fn [^SupervisorSummary sup]
+ {:supervisor-info (IClusterMetricsConsumer$SupervisorInfo.
+ (.get_host sup)
+ (.get_supervisor_id sup)
+ (System/currentTimeMillis))
+ ; FIXME: this is similar to what ui/core have... should we
extract it to another location?
+ :data-points (map
+ (fn [[k v]] (DataPoint. k v))
+ {"slotsTotal" (.get_num_workers sup)
+ "slotsUsed" (.get_num_used_workers sup)
+ "totalMem" (get (.get_total_resources
sup) Config/SUPERVISOR_MEMORY_CAPACITY_MB)
+ "totalCpu" (get (.get_total_resources
sup) Config/SUPERVISOR_CPU_CAPACITY)
+ "usedMem" (.get_used_mem sup)
+ "usedCpu" (.get_used_cpu sup)})
+ })
+ sups)))
+
+(defn send-cluster-metrics-to-executors [nimbus-service nimbus]
+ (let [cluster-summary (.getClusterInfo nimbus-service)
+ cluster-metrics (extract-cluster-metrics cluster-summary)
+ supervisors-metrics (extract-supervisors-metrics cluster-summary)]
+ (dofor
+ [consumer-executor (:cluster-consumer-executors nimbus)]
+ (do
+ (.handleDataPoints consumer-executor (:cluster-info
cluster-metrics) (:data-points cluster-metrics))
+ (dofor
+ [supervisor-metrics supervisors-metrics]
+ (do
+ (log-message (:supervisor-info supervisor-metrics) " / "
(:data-points supervisor-metrics))
+ (.handleDataPoints consumer-executor (:supervisor-info
supervisor-metrics) (:data-points supervisor-metrics))))))))
+
+(defn mk-reified-nimbus [nimbus conf blob-store]
+ (let [principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
admin-users (or (.get conf NIMBUS-ADMINS) [])
get-common-topo-info
- (fn [^String storm-id operation]
- (let [storm-cluster-state (:storm-cluster-state nimbus)
- topology-conf (try-read-storm-conf conf storm-id
blob-store)
- storm-name (topology-conf TOPOLOGY-NAME)
- _ (check-authorization! nimbus
- storm-name
- topology-conf
- operation)
- topology (try-read-storm-topology storm-id blob-store)
- task->component (storm-task-info topology topology-conf)
- base (.storm-base storm-cluster-state storm-id nil)
- launch-time-secs (if base (:launch-time-secs base)
- (throw
- (NotAliveException. (str
storm-id))))
- assignment (.assignment-info storm-cluster-state
storm-id nil)
- beats (map-val :heartbeat (get @(:heartbeats-cache
nimbus)
- storm-id))
- all-components (set (vals task->component))]
- {:storm-name storm-name
- :storm-cluster-state storm-cluster-state
- :all-components all-components
- :launch-time-secs launch-time-secs
- :assignment assignment
- :beats beats
- :topology topology
- :task->component task->component
- :base base}))
+ (fn [^String storm-id operation]
--- End diff --
it will be great if we can preserve previous spaces looks like this diff
showing up unnecessarily
> Introduce ClusterMetricsConsumer
> --------------------------------
>
> Key: STORM-1723
> URL: https://issues.apache.org/jira/browse/STORM-1723
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Affects Versions: 2.0.0, 1.0.1
> Reporter: Jungtaek Lim
> Assignee: Jungtaek Lim
>
> NOTE: This issue is already discussed shortly. Please refer
> [here|http://mail-archives.apache.org/mod_mbox/storm-dev/201604.mbox/%3CCAF5108hDCcMKxLXKUYLReOoKkNNdgW2YudweR+mKr=1hlsl...@mail.gmail.com%3E]
> for details.
> This issue focuses to introduce ClusterMetricsConsumer and provide interface
> to let users plugin their consumers.
> ClusterMetricsConsumers will be attached to Nimbus, and leader of Nimbus will
> push cluster related metrics to ClusterMetricsConsumer.
> Requirements of ClusterMetricsConsumer are here:
> - Only leader of Nimbus should publish cluster metrics to consumer.
> - Nimbus shouldn't be affected by crashing or heavy latency on consumer.
> - Consumer should have resilient when crashing or Nimbus should take care of.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)