Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209041653 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4723,4 +4754,192 @@ public IScheduler getForcedScheduler() { } + //enum NotPorted { + // //Declared in StormConf. I don't see the value in reporting so. + // SUPERVISOR_TOTAL_RESOURCE, + // //May be able to aggregate based on status; + // TOPOLOGY_STATUS, + // TOPOLOGY_SCHED_STATUS, + // //May be aggregated, as well as other distinct values + // NUM_DISTINCT_NIMBUS_VERSION; + //} + + private class ClusterSummaryMetricSet implements MetricSet, Runnable { + static final int CACHING_WINDOW = 5; + static final int PORTED_METRICS = 25; + static final String SUMMARY = "summary"; + + private final Map<String, com.codahale.metrics.Metric> ported = new HashMap<>(PORTED_METRICS); + private final Function<String, Histogram> registerHistogram = (name) -> { + final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); + ported.put(name, histogram); + return histogram; + }; + private volatile boolean active = false; + + //NImbus metrics distribution + private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + + //Supervisor metrics distribution + private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); + private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); + private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); + private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); + private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-CPU"); + private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); + private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-CPU"); + + //Topology metrics distribution + private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); + private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); + private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); + private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); + private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); + private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); + private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); + private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-CPU"); + private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); + private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); + private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-CPU"); + + ClusterSummaryMetricSet() { + //Break the code if out of sync to thrift protocol + assert ClusterSummary._Fields.values().length == 3 + && ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS + && ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES + && ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + + final CachedGauge<ClusterSummary> cachedSummary = new CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) { + @Override + protected ClusterSummary loadValue() { + try { + if (active) { + ClusterSummary newSummary = getClusterInfoImpl(); + LOG.info("the new summary is {}", newSummary); + //This is ugly but I can't think of a better way to update histogram only once per caching + // It also kind of depends on the implementation that gauges gets updated before histograms + updateHistogram(newSummary); --- End diff -- Yes, because I want to collect the distribution of value (let's say the assigned cpu) across all entities (daemons, or topologies) only within one clusterSummary (so it's the distribution of assigned cpu among all supervisors at one moment, snapshotted in ClusterSummary). It doesn't make sense to me to accumulate both across entities and time.
---