Repository: flink Updated Branches: refs/heads/master ccc86e9f1 -> d760bbc96
[hotfix] [core] Re-enable JMX metrics as the default. This also does some minor code cleanups, logging fixes, and smoother concurrency handling. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d760bbc9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d760bbc9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d760bbc9 Branch: refs/heads/master Commit: d760bbc962ea34983f3dad5e72fdb9d6a3d41c15 Parents: ccc86e9 Author: Stephan Ewen <se...@apache.org> Authored: Wed Jun 8 21:02:54 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Jun 9 21:14:56 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/metrics/MetricRegistry.java | 12 ++++++++--- .../flink/metrics/reporter/JMXReporter.java | 2 +- .../flink/metrics/statsd/StatsDReporter.java | 21 +++++++++++++++----- .../flink/runtime/taskmanager/TaskManager.scala | 8 ++++++-- 4 files changed, 32 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d760bbc9/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java index 52a44cf..b3422e1 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java @@ -57,7 +57,7 @@ public class MetricRegistry { // configuration keys // ------------------------------------------------------------------------ - private static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class); + static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class); private final MetricReporter reporter; private final java.util.Timer timer; @@ -83,7 +83,9 @@ public class MetricRegistry { final String className = config.getString(KEY_METRICS_REPORTER_CLASS, null); if (className == null) { - this.reporter = null; + // by default, create JMX metrics + LOG.info("No metrics reporter configured, exposing metrics via JMX"); + this.reporter = new JMXReporter(); this.timer = null; } else { @@ -239,7 +241,11 @@ public class MetricRegistry { @Override public void run() { - reporter.report(); + try { + reporter.report(); + } catch (Throwable t) { + LOG.warn("Error while reporting metrics", t); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/d760bbc9/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java index 71c80de..db81164 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java @@ -23,8 +23,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Metric; - import org.apache.flink.metrics.groups.AbstractMetricGroup; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/flink/blob/d760bbc9/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java index 124b21d..ae57f55 100644 --- a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java +++ b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java @@ -33,7 +33,9 @@ import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.SocketException; +import java.util.ConcurrentModificationException; import java.util.Map; +import java.util.NoSuchElementException; /** * Largely based on the StatsDReporter class by ReadyTalk @@ -88,12 +90,21 @@ public class StatsDReporter extends AbstractReporter implements Scheduled { @Override public void report() { - for (Map.Entry<Gauge<?>, String> entry : gauges.entrySet()) { - reportGauge(entry.getValue(), entry.getKey()); - } + // instead of locking here, we tolerate exceptions + // we do this to prevent holding the lock for very long and blocking + // operator creation and shutdown + try { + for (Map.Entry<Gauge<?>, String> entry : gauges.entrySet()) { + reportGauge(entry.getValue(), entry.getKey()); + } - for (Map.Entry<Counter, String> entry : counters.entrySet()) { - reportCounter(entry.getValue(), entry.getKey()); + for (Map.Entry<Counter, String> entry : counters.entrySet()) { + reportCounter(entry.getValue(), entry.getKey()); + } + } + catch (ConcurrentModificationException | NoSuchElementException e) { + // ignore - may happen when metrics are concurrently added or removed + // report next time } } http://git-wip-us.apache.org/repos/asf/flink/blob/d760bbc9/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index ecf8d1a..8ef22af 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -265,9 +265,13 @@ class TaskManager( case t: Exception => log.error("FileCache did not shutdown properly.", t) } + // failsafe shutdown of the metrics registry try { - //enable this before merging - //metricsRegistry.shutdown() + val reg = metricsRegistry + metricsRegistry = null + if (reg != null) { + reg.shutdown() + } } catch { case t: Exception => log.error("MetricRegistry did not shutdown properly.", t) }