Repository: hadoop
Updated Branches:
  refs/heads/trunk 2c3da25fd -> 1942364ef


HADOOP-11105. MetricsSystemImpl could leak memory in registered callbacks. 
Contributed by Chuan Liu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1942364e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1942364e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1942364e

Branch: refs/heads/trunk
Commit: 1942364ef14396e9bd94a87c0d901ff9abe1d42a
Parents: 2c3da25
Author: cnauroth <cnaur...@apache.org>
Authored: Thu Sep 18 15:36:43 2014 -0700
Committer: cnauroth <cnaur...@apache.org>
Committed: Thu Sep 18 15:36:43 2014 -0700

----------------------------------------------------------------------
 .../hadoop/metrics2/impl/MetricsSystemImpl.java | 35 ++++++++++++++++----
 1 file changed, 28 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1942364e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
index 722abd9..2107e68 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
@@ -83,7 +83,12 @@ public class MetricsSystemImpl extends MetricsSystem 
implements MetricsSource {
   private final Map<String, MetricsSource> allSources;
   private final Map<String, MetricsSinkAdapter> sinks;
   private final Map<String, MetricsSink> allSinks;
+
+  // The callback list is used by register(Callback callback), while
+  // the callback map is used by register(String name, String desc, T sink)
   private final List<Callback> callbacks;
+  private final Map<String, Callback> namedCallbacks;
+
   private final MetricsCollectorImpl collector;
   private final MetricsRegistry registry = new MetricsRegistry(MS_NAME);
   @Metric({"Snapshot", "Snapshot stats"}) MutableStat snapshotStat;
@@ -119,6 +124,7 @@ public class MetricsSystemImpl extends MetricsSystem 
implements MetricsSource {
     sourceConfigs = Maps.newHashMap();
     sinkConfigs = Maps.newHashMap();
     callbacks = Lists.newArrayList();
+    namedCallbacks = Maps.newHashMap();
     injectedTags = Lists.newArrayList();
     collector = new MetricsCollectorImpl();
     if (prefix != null) {
@@ -178,11 +184,13 @@ public class MetricsSystemImpl extends MetricsSystem 
implements MetricsSource {
       return;
     }
     for (Callback cb : callbacks) cb.preStart();
+    for (Callback cb : namedCallbacks.values()) cb.preStart();
     configure(prefix);
     startTimer();
     monitoring = true;
     LOG.info(prefix +" metrics system started");
     for (Callback cb : callbacks) cb.postStart();
+    for (Callback cb : namedCallbacks.values()) cb.postStart();
   }
 
   @Override
@@ -198,6 +206,7 @@ public class MetricsSystemImpl extends MetricsSystem 
implements MetricsSource {
       return;
     }
     for (Callback cb : callbacks) cb.preStop();
+    for (Callback cb : namedCallbacks.values()) cb.preStop();
     LOG.info("Stopping "+ prefix +" metrics system...");
     stopTimer();
     stopSources();
@@ -206,6 +215,7 @@ public class MetricsSystemImpl extends MetricsSystem 
implements MetricsSource {
     monitoring = false;
     LOG.info(prefix +" metrics system stopped.");
     for (Callback cb : callbacks) cb.postStop();
+    for (Callback cb : namedCallbacks.values()) cb.postStop();
   }
 
   @Override public synchronized <T>
@@ -224,7 +234,7 @@ public class MetricsSystemImpl extends MetricsSystem 
implements MetricsSource {
     }
     // We want to re-register the source to pick up new config when the
     // metrics system restarts.
-    register(new AbstractCallback() {
+    register(name, new AbstractCallback() {
       @Override public void postStart() {
         registerSource(finalName, finalDesc, s);
       }
@@ -241,6 +251,9 @@ public class MetricsSystemImpl extends MetricsSystem 
implements MetricsSource {
     if (allSources.containsKey(name)) {
       allSources.remove(name);
     }
+    if (namedCallbacks.containsKey(name)) {
+      namedCallbacks.remove(name);
+    }
   }
 
   synchronized
@@ -268,7 +281,7 @@ public class MetricsSystemImpl extends MetricsSystem 
implements MetricsSource {
     }
     // We want to re-register the sink to pick up new config
     // when the metrics system restarts.
-    register(new AbstractCallback() {
+    register(name, new AbstractCallback() {
       @Override public void postStart() {
         register(name, description, sink);
       }
@@ -289,9 +302,16 @@ public class MetricsSystemImpl extends MetricsSystem 
implements MetricsSource {
 
   @Override
   public synchronized void register(final Callback callback) {
-    callbacks.add((Callback) Proxy.newProxyInstance(
-        callback.getClass().getClassLoader(), new Class<?>[] { Callback.class 
},
-        new InvocationHandler() {
+    callbacks.add((Callback) getProxyForCallback(callback));
+  }
+
+  private synchronized void register(String name, final Callback callback) {
+    namedCallbacks.put(name, (Callback) getProxyForCallback(callback));
+  }
+
+  private Object getProxyForCallback(final Callback callback) {
+    return Proxy.newProxyInstance(callback.getClass().getClassLoader(),
+        new Class<?>[] { Callback.class }, new InvocationHandler() {
           @Override
           public Object invoke(Object proxy, Method method, Object[] args)
               throws Throwable {
@@ -299,11 +319,11 @@ public class MetricsSystemImpl extends MetricsSystem 
implements MetricsSource {
               return method.invoke(callback, args);
             } catch (Exception e) {
               // These are not considered fatal.
-              LOG.warn("Caught exception in callback "+ method.getName(), e);
+              LOG.warn("Caught exception in callback " + method.getName(), e);
             }
             return null;
           }
-        }));
+        });
   }
 
   @Override
@@ -572,6 +592,7 @@ public class MetricsSystemImpl extends MetricsSystem 
implements MetricsSource {
     allSources.clear();
     allSinks.clear();
     callbacks.clear();
+    namedCallbacks.clear();
     if (mbeanName != null) {
       MBeans.unregister(mbeanName);
       mbeanName = null;

Reply via email to