STORM-3147: Fix minor nits, rebase to use non-static metrics registry
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/392803c9 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/392803c9 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/392803c9 Branch: refs/heads/master Commit: 392803c9bb2fe81a5df57f695a0e6d7c37bd1f2e Parents: 0242859 Author: Stig Rohde Døssing <s...@apache.org> Authored: Mon Sep 17 22:21:12 2018 +0200 Committer: Stig Rohde Døssing <s...@apache.org> Committed: Mon Sep 17 22:47:33 2018 +0200 ---------------------------------------------------------------------- .../org/apache/storm/daemon/nimbus/Nimbus.java | 72 ++++++++++---------- .../storm/metric/StormMetricsRegistry.java | 13 ++++ 2 files changed, 50 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/392803c9/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index 36eca99..45c1c87 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -22,6 +22,7 @@ import com.codahale.metrics.CachedGauge; import com.codahale.metrics.DerivativeGauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricSet; import com.codahale.metrics.SlidingTimeWindowReservoir; import com.codahale.metrics.Timer; @@ -449,6 +450,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private final IPrincipalToLocal principalToLocal; private final StormMetricsRegistry metricsRegistry; private final ResourceMetrics resourceMetrics; + private final ClusterSummaryMetricSet clusterMetricSet; private MetricStore metricsStore; private IAuthorizer authorizationHandler; //Cached CuratorFramework, mainly used for BlobStore. @@ -600,7 +602,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { this.principalToLocal = ClientAuthUtils.getPrincipalToLocalPlugin(conf); this.supervisorClasspaths = Collections.unmodifiableNavigableMap( Utils.getConfiguredClasspathVersions(conf, EMPTY_STRING_LIST));// We don't use the classpath part of this, so just an empty list - clusterMetricSet = new ClusterSummaryMetricSet(); + clusterMetricSet = new ClusterSummaryMetricSet(metricsRegistry); } // TOPOLOGY STATE TRANSITIONS @@ -2946,19 +2948,19 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } }); - StormMetricsRegistry.registerGauge("nimbus:total-available-memory-non-negative", () -> nodeIdToResources.get().values() + metricsRegistry.registerGauge("nimbus:total-available-memory-non-negative", () -> nodeIdToResources.get().values() .parallelStream() .mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableMem(), 0)) .sum()); - StormMetricsRegistry.registerGauge("nimbus:available-cpu-non-negative", () -> nodeIdToResources.get().values() + metricsRegistry.registerGauge("nimbus:available-cpu-non-negative", () -> nodeIdToResources.get().values() .parallelStream() .mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableCpu(), 0)) .sum()); - StormMetricsRegistry.registerGauge("nimbus:total-memory", () -> nodeIdToResources.get().values() + metricsRegistry.registerGauge("nimbus:total-memory", () -> nodeIdToResources.get().values() .parallelStream() .mapToDouble(SupervisorResources::getTotalMem) .sum()); - StormMetricsRegistry.registerGauge("nimbus:total-cpu", () -> nodeIdToResources.get().values() + metricsRegistry.registerGauge("nimbus:total-cpu", () -> nodeIdToResources.get().values() .parallelStream() .mapToDouble(SupervisorResources::getTotalCpu) .sum()); @@ -2982,9 +2984,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { throw new RuntimeException(e); } }); - - //Should we make the delaySecs and recurSecs in sync with any conf value? - // They should be around the reporting interval, but it's not configurable + timer.scheduleRecurring(5, 5, clusterMetricSet); } catch (Exception e) { if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) { @@ -4661,7 +4661,6 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { if (metricsStore != null) { metricsStore.close(); } - //Put after timer close to avoid race condition clusterMetricSet.setActive(false); LOG.info("Shut down master"); } catch (Exception e) { @@ -4798,16 +4797,25 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } - private class ClusterSummaryMetricSet implements MetricSet, Runnable { - static final int CACHING_WINDOW = 5; - static final String SUMMARY = "summary"; + private static class ClusterSummaryMetrics implements MetricSet { + private static final String SUMMARY = "summary"; + private final Map<String, com.codahale.metrics.Metric> metrics = new HashMap<>(); + + public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { + return metrics.put(MetricRegistry.name(SUMMARY, key), value); + } - private final Map<String, com.codahale.metrics.Metric> clusterSummaryMetrics = new HashMap<String, com.codahale.metrics.Metric>() { - @Override - public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { - return super.put(StormMetricsRegistry.name(SUMMARY, key), value); -} - }; + @Override + public Map<String, com.codahale.metrics.Metric> getMetrics() { + return metrics; + } + } + + private class ClusterSummaryMetricSet implements Runnable { + private static final int CACHING_WINDOW = 5; + + private final ClusterSummaryMetrics clusterSummaryMetrics = new ClusterSummaryMetrics(); + private final Function<String, Histogram> registerHistogram = (name) -> { //This histogram reflects the data distribution across only one ClusterSummary, i.e., // data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment. @@ -4842,6 +4850,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + private final StormMetricsRegistry metricsRegistry; /** * Constructor to put all items in ClusterSummary in MetricSet as a metric. @@ -4850,7 +4859,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than * reporting interval to avoid outdated reporting. */ - ClusterSummaryMetricSet() { + ClusterSummaryMetricSet(StormMetricsRegistry metricsRegistry) { + this.metricsRegistry = metricsRegistry; //Break the code if out of sync to thrift protocol assert ClusterSummary._Fields.values().length == 3 && ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS @@ -4862,11 +4872,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { protected ClusterSummary loadValue() { try { ClusterSummary newSummary = getClusterInfoImpl(); - LOG.debug("the new summary is {}", newSummary); - //Force update histogram upon each cache refresh - //This behavior relies on the fact that most common implementation of Reporter - // reports Gauges before Histograms. Because DerivativeGauge will trigger cache - // refresh upon reporter's query, histogram will also be updated before query + LOG.debug("The new summary is {}", newSummary); + /* + * Update histograms based on the new summary. Most common implementation of Reporter reports Gauges before + * Histograms. Because DerivativeGauge will trigger cache refresh upon reporter's query, histogram will also be + * updated before query + */ updateHistogram(newSummary); return newSummary; } catch (Exception e) { @@ -4966,29 +4977,20 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } } - //This is not thread safe void setActive(final boolean active) { if (this.active != active) { this.active = active; if (active) { - StormMetricsRegistry.registerMetricSet(this); + metricsRegistry.registerAll(clusterSummaryMetrics); } else { - //Could be replaced when metrics support remove all functions - // https://github.com/dropwizard/metrics/pull/1280 - StormMetricsRegistry.unregisterMetricSet(this); + metricsRegistry.removeAll(clusterSummaryMetrics); } } } @Override - public Map<String, com.codahale.metrics.Metric> getMetrics() { - return clusterSummaryMetrics; - } - - @Override public void run() { try { - //State changed setActive(isLeader()); } catch (Exception e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/storm/blob/392803c9/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java index c4d2f3f..cc98804 100644 --- a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java +++ b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java @@ -15,7 +15,9 @@ package org.apache.storm.metric; import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.MetricSet; import com.codahale.metrics.Reservoir; import com.codahale.metrics.Timer; import java.util.List; @@ -54,6 +56,17 @@ public class StormMetricsRegistry { public <V> Gauge<V> registerGauge(final String name, Gauge<V> gauge) { return registry.gauge(name, () -> gauge); } + + public void registerAll(MetricSet metrics) { + registry.registerAll(metrics); + } + + public void removeAll(MetricSet metrics) { + //Could be replaced when metrics support remove all functions + // https://github.com/dropwizard/metrics/pull/1280 + Map<String, Metric> nameToMetric = metrics.getMetrics(); + registry.removeMatching((name, metric) -> nameToMetric.containsKey(name)); + } public void startMetricsReporters(Map<String, Object> daemonConf) { reporters = MetricsUtils.getPreparableReporters(daemonConf);