STORM-3147: Fix minor nits, rebase to use non-static metrics registry

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/392803c9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/392803c9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/392803c9

Branch: refs/heads/master
Commit: 392803c9bb2fe81a5df57f695a0e6d7c37bd1f2e
Parents: 0242859
Author: Stig Rohde Døssing <s...@apache.org>
Authored: Mon Sep 17 22:21:12 2018 +0200
Committer: Stig Rohde Døssing <s...@apache.org>
Committed: Mon Sep 17 22:47:33 2018 +0200

----------------------------------------------------------------------
 .../org/apache/storm/daemon/nimbus/Nimbus.java  | 72 ++++++++++----------
 .../storm/metric/StormMetricsRegistry.java      | 13 ++++
 2 files changed, 50 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/392803c9/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 36eca99..45c1c87 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -22,6 +22,7 @@ import com.codahale.metrics.CachedGauge;
 import com.codahale.metrics.DerivativeGauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.SlidingTimeWindowReservoir;
 import com.codahale.metrics.Timer;
@@ -449,6 +450,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     private final IPrincipalToLocal principalToLocal;
     private final StormMetricsRegistry metricsRegistry;
     private final ResourceMetrics resourceMetrics;
+    private final ClusterSummaryMetricSet clusterMetricSet;
     private MetricStore metricsStore;
     private IAuthorizer authorizationHandler;
     //Cached CuratorFramework, mainly used for BlobStore.
@@ -600,7 +602,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         this.principalToLocal = 
ClientAuthUtils.getPrincipalToLocalPlugin(conf);
         this.supervisorClasspaths = Collections.unmodifiableNavigableMap(
             Utils.getConfiguredClasspathVersions(conf, EMPTY_STRING_LIST));// 
We don't use the classpath part of this, so just an empty list
-        clusterMetricSet = new ClusterSummaryMetricSet();
+        clusterMetricSet = new ClusterSummaryMetricSet(metricsRegistry);
     }
 
     // TOPOLOGY STATE TRANSITIONS
@@ -2946,19 +2948,19 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                                         }
                                     });
 
-            
StormMetricsRegistry.registerGauge("nimbus:total-available-memory-non-negative",
 () -> nodeIdToResources.get().values()
+            
metricsRegistry.registerGauge("nimbus:total-available-memory-non-negative", () 
-> nodeIdToResources.get().values()
                 .parallelStream()
                 .mapToDouble(supervisorResources -> 
Math.max(supervisorResources.getAvailableMem(), 0))
                 .sum());
-            
StormMetricsRegistry.registerGauge("nimbus:available-cpu-non-negative", () -> 
nodeIdToResources.get().values()
+            metricsRegistry.registerGauge("nimbus:available-cpu-non-negative", 
() -> nodeIdToResources.get().values()
                 .parallelStream()
                 .mapToDouble(supervisorResources -> 
Math.max(supervisorResources.getAvailableCpu(), 0))
                 .sum());
-            StormMetricsRegistry.registerGauge("nimbus:total-memory", () -> 
nodeIdToResources.get().values()
+            metricsRegistry.registerGauge("nimbus:total-memory", () -> 
nodeIdToResources.get().values()
                 .parallelStream()
                 .mapToDouble(SupervisorResources::getTotalMem)
                 .sum());
-            StormMetricsRegistry.registerGauge("nimbus:total-cpu", () -> 
nodeIdToResources.get().values()
+            metricsRegistry.registerGauge("nimbus:total-cpu", () -> 
nodeIdToResources.get().values()
                 .parallelStream()
                 .mapToDouble(SupervisorResources::getTotalCpu)
                 .sum());
@@ -2982,9 +2984,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                                             throw new RuntimeException(e);
                                         }
                                     });
-
-            //Should we make the delaySecs and recurSecs in sync with any conf 
value?
-            // They should be around the reporting interval, but it's not 
configurable
+            
             timer.scheduleRecurring(5, 5, clusterMetricSet);
         } catch (Exception e) {
             if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, 
e)) {
@@ -4661,7 +4661,6 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             if (metricsStore != null) {
                 metricsStore.close();
             }
-            //Put after timer close to avoid race condition
             clusterMetricSet.setActive(false);
             LOG.info("Shut down master");
         } catch (Exception e) {
@@ -4798,16 +4797,25 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
 
     }
 
-    private class ClusterSummaryMetricSet implements MetricSet, Runnable {
-        static final int CACHING_WINDOW = 5;
-        static final String SUMMARY = "summary";
+    private static class ClusterSummaryMetrics implements MetricSet {
+        private static final String SUMMARY = "summary";
+        private final Map<String, com.codahale.metrics.Metric> metrics = new 
HashMap<>();
+        
+        public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
+            return metrics.put(MetricRegistry.name(SUMMARY, key), value);
+        }
 
-        private final Map<String, com.codahale.metrics.Metric> 
clusterSummaryMetrics = new HashMap<String, com.codahale.metrics.Metric>() {
-            @Override
-            public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
-                return super.put(StormMetricsRegistry.name(SUMMARY, key), 
value);
-}
-        };
+        @Override
+        public Map<String, com.codahale.metrics.Metric> getMetrics() {
+            return metrics;
+        }
+    }
+    
+    private class ClusterSummaryMetricSet implements Runnable {
+        private static final int CACHING_WINDOW = 5;
+        
+        private final ClusterSummaryMetrics clusterSummaryMetrics = new 
ClusterSummaryMetrics();
+        
         private final Function<String, Histogram> registerHistogram = (name) 
-> {
             //This histogram reflects the data distribution across only one 
ClusterSummary, i.e.,
             // data distribution across all entities of a type (e.g., data 
from all nimbus/topologies) at one moment.
@@ -4842,6 +4850,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
         private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
         private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
+        private final StormMetricsRegistry metricsRegistry;
 
         /**
          * Constructor to put all items in ClusterSummary in MetricSet as a 
metric.
@@ -4850,7 +4859,8 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
          * In case of {@link com.codahale.metrics.ScheduledReporter}, 
CACHING_WINDOW should be set shorter than
          * reporting interval to avoid outdated reporting.
          */
-        ClusterSummaryMetricSet() {
+        ClusterSummaryMetricSet(StormMetricsRegistry metricsRegistry) {
+            this.metricsRegistry = metricsRegistry;
             //Break the code if out of sync to thrift protocol
             assert ClusterSummary._Fields.values().length == 3
                 && ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
@@ -4862,11 +4872,12 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                 protected ClusterSummary loadValue() {
                     try {
                         ClusterSummary newSummary = getClusterInfoImpl();
-                        LOG.debug("the new summary is {}", newSummary);
-                        //Force update histogram upon each cache refresh
-                        //This behavior relies on the fact that most common 
implementation of Reporter
-                        // reports Gauges before Histograms. Because 
DerivativeGauge will trigger cache
-                        // refresh upon reporter's query, histogram will also 
be updated before query
+                        LOG.debug("The new summary is {}", newSummary);
+                        /*
+                         * Update histograms based on the new summary. Most 
common implementation of Reporter reports Gauges before
+                         * Histograms. Because DerivativeGauge will trigger 
cache refresh upon reporter's query, histogram will also be
+                         * updated before query
+                         */
                         updateHistogram(newSummary);
                         return newSummary;
                     } catch (Exception e) {
@@ -4966,29 +4977,20 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             }
         }
 
-        //This is not thread safe
         void setActive(final boolean active) {
             if (this.active != active) {
                 this.active = active;
                 if (active) {
-                    StormMetricsRegistry.registerMetricSet(this);
+                    metricsRegistry.registerAll(clusterSummaryMetrics);
                 } else {
-                    //Could be replaced when metrics support remove all 
functions
-                    // https://github.com/dropwizard/metrics/pull/1280
-                    StormMetricsRegistry.unregisterMetricSet(this);
+                    metricsRegistry.removeAll(clusterSummaryMetrics);
                 }
             }
         }
 
         @Override
-        public Map<String, com.codahale.metrics.Metric> getMetrics() {
-            return clusterSummaryMetrics;
-        }
-
-        @Override
         public void run() {
             try {
-                //State changed
                 setActive(isLeader());
             } catch (Exception e) {
                 throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/storm/blob/392803c9/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java 
b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
index c4d2f3f..cc98804 100644
--- 
a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
+++ 
b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
@@ -15,7 +15,9 @@ package org.apache.storm.metric;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.Reservoir;
 import com.codahale.metrics.Timer;
 import java.util.List;
@@ -54,6 +56,17 @@ public class StormMetricsRegistry {
     public <V> Gauge<V> registerGauge(final String name, Gauge<V> gauge) {
         return registry.gauge(name, () -> gauge);
     }
+    
+    public void registerAll(MetricSet metrics) {
+        registry.registerAll(metrics);
+    }
+    
+    public void removeAll(MetricSet metrics) {
+        //Could be replaced when metrics support remove all functions
+        // https://github.com/dropwizard/metrics/pull/1280
+        Map<String, Metric> nameToMetric = metrics.getMetrics();
+        registry.removeMatching((name, metric) -> 
nameToMetric.containsKey(name));
+    }
 
     public void startMetricsReporters(Map<String, Object> daemonConf) {
         reporters = MetricsUtils.getPreparableReporters(daemonConf);

Reply via email to