Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209363589 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() { } + private class ClusterSummaryMetricSet implements MetricSet, Runnable { + static final int CACHING_WINDOW = 5; + static final String SUMMARY = "summary"; + + private final Map<String, com.codahale.metrics.Metric> clusterSummaryMetrics = new HashMap<String, com.codahale.metrics.Metric>() { + @Override + public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { + return super.put(StormMetricsRegistry.name(SUMMARY, key), value); + } + }; + private final Function<String, Histogram> registerHistogram = (name) -> { + //This histogram reflects the data distribution across only one ClusterSummary, i.e., + // data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment. + // Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update + final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); + clusterSummaryMetrics.put(name, histogram); + return histogram; + }; + private volatile boolean active = false; + + //NImbus metrics distribution + private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + + //Supervisor metrics distribution + private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); + private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); + private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); + private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); + private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); + private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); + private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + + //Topology metrics distribution + private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); + private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); + private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); + private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); + private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); + private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); + private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); + private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); + private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); + private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); + private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + + /** + * Constructor to put all items in ClusterSummary in MetricSet as a metric. + * All metrics are derived from a cached ClusterSummary object, + * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters. + * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than + * reporting interval to avoid outdated reporting. + */ + ClusterSummaryMetricSet() { + //Break the code if out of sync to thrift protocol + assert ClusterSummary._Fields.values().length == 3 + && ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS + && ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES + && ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + + final CachedGauge<ClusterSummary> cachedSummary = new CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) { + @Override + protected ClusterSummary loadValue() { + try { + if (active) { + ClusterSummary newSummary = getClusterInfoImpl(); + LOG.debug("the new summary is {}", newSummary); + //Force update histogram upon each cache refresh + //This behavior relies on the fact that most common implementation of Reporter --- End diff -- 1. I think it'd be better to have all metrics in sync. Having a StormTimer is also problematic because there's no way to guarantee when histogram gets updated vs. when reporters decide to report metrics. We would then get out of sync metrics for gauge and histogram, which can be confusing. 2. Since for histogram we track distribution for only one ClusterSummary measurement. The out of sync behavior give us no guarantee if newer value will override older ones that haven't been reported, and it'd be hard to figure out the correct caching window. This ultimately boils down to the difference between active update pattern of histogram vs. passive update pattern of gauge, and this is the best kind of hack I can think of as of now to mediate the conflict. An alternative would be to convert all histogram to gauge, which means manually calculate avg/min/max/percentile. This will bloat the code greatly and I don't see a very good reason doing it, almost like trading one hack for another.
---