Repository: flink
Updated Branches:
  refs/heads/master ccc86e9f1 -> d760bbc96


[hotfix] [core] Re-enable JMX metrics as the default.

This also does some minor code cleanups, logging fixes, and smoother 
concurrency handling.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d760bbc9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d760bbc9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d760bbc9

Branch: refs/heads/master
Commit: d760bbc962ea34983f3dad5e72fdb9d6a3d41c15
Parents: ccc86e9
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jun 8 21:02:54 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jun 9 21:14:56 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/metrics/MetricRegistry.java    | 12 ++++++++---
 .../flink/metrics/reporter/JMXReporter.java     |  2 +-
 .../flink/metrics/statsd/StatsDReporter.java    | 21 +++++++++++++++-----
 .../flink/runtime/taskmanager/TaskManager.scala |  8 ++++++--
 4 files changed, 32 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d760bbc9/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java 
b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
index 52a44cf..b3422e1 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
@@ -57,7 +57,7 @@ public class MetricRegistry {
        //  configuration keys
        // 
------------------------------------------------------------------------
        
-       private static final Logger LOG = 
LoggerFactory.getLogger(MetricRegistry.class);
+       static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
        
        private final MetricReporter reporter;
        private final java.util.Timer timer;
@@ -83,7 +83,9 @@ public class MetricRegistry {
                
                final String className = 
config.getString(KEY_METRICS_REPORTER_CLASS, null);
                if (className == null) {
-                       this.reporter = null;
+                       // by default, create JMX metrics
+                       LOG.info("No metrics reporter configured, exposing 
metrics via JMX");
+                       this.reporter = new JMXReporter();
                        this.timer = null;
                }
                else {
@@ -239,7 +241,11 @@ public class MetricRegistry {
 
                @Override
                public void run() {
-                       reporter.report();
+                       try {
+                               reporter.report();
+                       } catch (Throwable t) {
+                               LOG.warn("Error while reporting metrics", t);
+                       }
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d760bbc9/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
index 71c80de..db81164 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
@@ -23,8 +23,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
-
 import org.apache.flink.metrics.groups.AbstractMetricGroup;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d760bbc9/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 
b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
index 124b21d..ae57f55 100644
--- 
a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ 
b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -33,7 +33,9 @@ import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
 import java.net.SocketException;
+import java.util.ConcurrentModificationException;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
 /**
  * Largely based on the StatsDReporter class by ReadyTalk
@@ -88,12 +90,21 @@ public class StatsDReporter extends AbstractReporter 
implements Scheduled {
 
        @Override
        public void report() {
-               for (Map.Entry<Gauge<?>, String> entry : gauges.entrySet()) {
-                       reportGauge(entry.getValue(), entry.getKey());
-               }
+               // instead of locking here, we tolerate exceptions
+               // we do this to prevent holding the lock for very long and 
blocking
+               // operator creation and shutdown
+               try {
+                       for (Map.Entry<Gauge<?>, String> entry : 
gauges.entrySet()) {
+                               reportGauge(entry.getValue(), entry.getKey());
+                       }
 
-               for (Map.Entry<Counter, String> entry : counters.entrySet()) {
-                       reportCounter(entry.getValue(), entry.getKey());
+                       for (Map.Entry<Counter, String> entry : 
counters.entrySet()) {
+                               reportCounter(entry.getValue(), entry.getKey());
+                       }
+               }
+               catch (ConcurrentModificationException | NoSuchElementException 
e) {
+                       // ignore - may happen when metrics are concurrently 
added or removed
+                       // report next time
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d760bbc9/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index ecf8d1a..8ef22af 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -265,9 +265,13 @@ class TaskManager(
       case t: Exception => log.error("FileCache did not shutdown properly.", t)
     }
     
+    // failsafe shutdown of the metrics registry
     try {
-      //enable this before merging
-      //metricsRegistry.shutdown()
+      val reg = metricsRegistry
+      metricsRegistry = null
+      if (reg != null) {
+        reg.shutdown()
+      }
     } catch {
       case t: Exception => log.error("MetricRegistry did not shutdown 
properly.", t)
     }

Reply via email to