[FLINK-7876] Merge TaskExecutorMetricsInitializer and MetricUtils This commit removes the TaskExecutorMetricsInitializer and moves its methods to MetricUtils.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fb7e0b9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fb7e0b9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fb7e0b9 Branch: refs/heads/master Commit: 7fb7e0b9775d1773d20e63732130ae140781a6f2 Parents: ad42ee2 Author: Till Rohrmann <[email protected]> Authored: Wed Nov 1 12:31:52 2017 +0100 Committer: Till Rohrmann <[email protected]> Committed: Wed Nov 1 15:52:01 2017 +0100 ---------------------------------------------------------------------- .../runtime/jobmaster/JobManagerRunner.java | 3 +- .../flink/runtime/metrics/util/MetricUtils.java | 230 +++++++++-------- .../utils/TaskExecutorMetricsInitializer.java | 257 ------------------- .../flink/runtime/jobmanager/JobManager.scala | 3 +- 4 files changed, 128 insertions(+), 365 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7fb7e0b9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 14baa6f..f95b5a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.metrics.util.MetricUtils; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.ExceptionUtils; @@ -127,7 +128,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty"); final String hostAddress = rpcService.getAddress().isEmpty() ? "localhost" : rpcService.getAddress(); - jobManagerMetrics = new JobManagerMetricGroup(metricRegistry, hostAddress); + jobManagerMetrics = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, hostAddress); this.jobManagerMetricGroup = jobManagerMetrics; // libraries and class loader first http://git-wip-us.apache.org/repos/asf/flink/blob/7fb7e0b9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java index 08353e3..2a59a7f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java @@ -22,23 +22,27 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; -import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.Preconditions; -import org.apache.commons.lang3.text.WordUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.management.BufferPoolMXBean; +import javax.management.AttributeNotFoundException; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.ReflectionException; + import java.lang.management.ClassLoadingMXBean; import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; -import java.lang.management.OperatingSystemMXBean; import java.lang.management.ThreadMXBean; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.util.List; /** @@ -51,6 +55,21 @@ public class MetricUtils { private MetricUtils() { } + public static JobManagerMetricGroup instantiateJobManagerMetricGroup( + final MetricRegistry metricRegistry, + final String hostname) { + final JobManagerMetricGroup jobManagerMetricGroup = new JobManagerMetricGroup( + metricRegistry, + hostname); + + MetricGroup statusGroup = jobManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME); + + // initialize the JM metrics + instantiateStatusMetrics(statusGroup); + + return jobManagerMetricGroup; + } + public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup( MetricRegistry metricRegistry, TaskManagerLocation taskManagerLocation, @@ -60,59 +79,55 @@ public class MetricUtils { taskManagerLocation.getHostname(), taskManagerLocation.getResourceID().toString()); + MetricGroup statusGroup = taskManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME); + // Initialize the TM metrics - TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, network); + instantiateStatusMetrics(statusGroup); + instantiateNetworkMetrics(statusGroup, network); return taskManagerMetricGroup; } - public static void instantiateNetworkMetrics( - MetricGroup metrics, - final NetworkEnvironment network) { - MetricGroup status = metrics.addGroup(METRIC_GROUP_STATUS_NAME); + public static void instantiateStatusMetrics( + MetricGroup metricGroup) { + MetricGroup jvm = metricGroup.addGroup("JVM"); - MetricGroup networkGroup = status - .addGroup("Network"); + instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader")); + instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector")); + instantiateMemoryMetrics(jvm.addGroup("Memory")); + instantiateThreadMetrics(jvm.addGroup("Threads")); + instantiateCPUMetrics(jvm.addGroup("CPU")); + } - networkGroup.gauge("TotalMemorySegments", new Gauge<Integer>() { + private static void instantiateNetworkMetrics( + MetricGroup metrics, + final NetworkEnvironment network) { + metrics.<Long, Gauge<Long>>gauge("TotalMemorySegments", new Gauge<Long> () { @Override - public Integer getValue() { - return network.getNetworkBufferPool().getTotalNumberOfMemorySegments(); + public Long getValue() { + return (long) network.getNetworkBufferPool().getTotalNumberOfMemorySegments(); } }); - networkGroup.gauge("AvailableMemorySegments", new Gauge<Integer>() { + + metrics.<Long, Gauge<Long>>gauge("AvailableMemorySegments", new Gauge<Long> () { @Override - public Integer getValue() { - return network.getNetworkBufferPool().getNumberOfAvailableMemorySegments(); + public Long getValue() { + return (long) network.getNetworkBufferPool().getNumberOfAvailableMemorySegments(); } }); } - public static void instantiateStatusMetrics( - MetricGroup metrics) { - MetricGroup status = metrics - .addGroup(METRIC_GROUP_STATUS_NAME); - - MetricGroup jvm = status - .addGroup("JVM"); - - instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader")); - instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector")); - instantiateMemoryMetrics(jvm.addGroup("Memory")); - instantiateThreadMetrics(jvm.addGroup("Threads")); - instantiateCPUMetrics(jvm.addGroup("CPU")); - } - private static void instantiateClassLoaderMetrics(MetricGroup metrics) { final ClassLoadingMXBean mxBean = ManagementFactory.getClassLoadingMXBean(); - metrics.gauge("ClassesLoaded", new Gauge<Long>() { + metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", new Gauge<Long> () { @Override public Long getValue() { return mxBean.getTotalLoadedClassCount(); } }); - metrics.gauge("ClassesUnloaded", new Gauge<Long>() { + + metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded", new Gauge<Long> () { @Override public Long getValue() { return mxBean.getUnloadedClassCount(); @@ -123,15 +138,17 @@ public class MetricUtils { private static void instantiateGarbageCollectorMetrics(MetricGroup metrics) { List<GarbageCollectorMXBean> garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans(); - for (final GarbageCollectorMXBean garbageCollector : garbageCollectors) { + for (final GarbageCollectorMXBean garbageCollector: garbageCollectors) { MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName()); - gcGroup.gauge("Count", new Gauge<Long>() { + + gcGroup.<Long, Gauge<Long>>gauge("Count", new Gauge<Long> () { @Override public Long getValue() { return garbageCollector.getCollectionCount(); } }); - gcGroup.gauge("Time", new Gauge<Long>() { + + gcGroup.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () { @Override public Long getValue() { return garbageCollector.getCollectionTime(); @@ -142,20 +159,22 @@ public class MetricUtils { private static void instantiateMemoryMetrics(MetricGroup metrics) { final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean(); + MetricGroup heap = metrics.addGroup("Heap"); - heap.gauge("Used", new Gauge<Long>() { + + heap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () { @Override public Long getValue() { return mxBean.getHeapMemoryUsage().getUsed(); } }); - heap.gauge("Committed", new Gauge<Long>() { + heap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () { @Override public Long getValue() { return mxBean.getHeapMemoryUsage().getCommitted(); } }); - heap.gauge("Max", new Gauge<Long>() { + heap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () { @Override public Long getValue() { return mxBean.getHeapMemoryUsage().getMax(); @@ -163,54 +182,61 @@ public class MetricUtils { }); MetricGroup nonHeap = metrics.addGroup("NonHeap"); - nonHeap.gauge("Used", new Gauge<Long>() { + + nonHeap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () { @Override public Long getValue() { return mxBean.getNonHeapMemoryUsage().getUsed(); } }); - nonHeap.gauge("Committed", new Gauge<Long>() { + nonHeap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () { @Override public Long getValue() { return mxBean.getNonHeapMemoryUsage().getCommitted(); } }); - nonHeap.gauge("Max", new Gauge<Long>() { + nonHeap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () { @Override public Long getValue() { return mxBean.getNonHeapMemoryUsage().getMax(); } }); - List<BufferPoolMXBean> bufferMxBeans = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class); + final MBeanServer con = ManagementFactory.getPlatformMBeanServer(); - for (final BufferPoolMXBean bufferMxBean : bufferMxBeans) { - MetricGroup bufferGroup = metrics.addGroup(WordUtils.capitalize(bufferMxBean.getName())); - bufferGroup.gauge("Count", new Gauge<Long>() { - @Override - public Long getValue() { - return bufferMxBean.getCount(); - } - }); - bufferGroup.gauge("MemoryUsed", new Gauge<Long>() { - @Override - public Long getValue() { - return bufferMxBean.getMemoryUsed(); - } - }); - bufferGroup.gauge("TotalCapacity", new Gauge<Long>() { - @Override - public Long getValue() { - return bufferMxBean.getTotalCapacity(); - } - }); + final String directBufferPoolName = "java.nio:type=BufferPool,name=direct"; + + try { + final ObjectName directObjectName = new ObjectName(directBufferPoolName); + + MetricGroup direct = metrics.addGroup("Direct"); + + direct.<Long, Gauge<Long>>gauge("Count", new AttributeGauge<>(con, directObjectName, "Count", -1L)); + direct.<Long, Gauge<Long>>gauge("MemoryUsed", new AttributeGauge<>(con, directObjectName, "MemoryUsed", -1L)); + direct.<Long, Gauge<Long>>gauge("TotalCapacity", new AttributeGauge<>(con, directObjectName, "TotalCapacity", -1L)); + } catch (MalformedObjectNameException e) { + LOG.warn("Could not create object name {}.", directBufferPoolName, e); + } + + final String mappedBufferPoolName = "java.nio:type=BufferPool,name=mapped"; + + try { + final ObjectName mappedObjectName = new ObjectName(mappedBufferPoolName); + + MetricGroup mapped = metrics.addGroup("Mapped"); + + mapped.<Long, Gauge<Long>>gauge("Count", new AttributeGauge<>(con, mappedObjectName, "Count", -1L)); + mapped.<Long, Gauge<Long>>gauge("MemoryUsed", new AttributeGauge<>(con, mappedObjectName, "MemoryUsed", -1L)); + mapped.<Long, Gauge<Long>>gauge("TotalCapacity", new AttributeGauge<>(con, mappedObjectName, "TotalCapacity", -1L)); + } catch (MalformedObjectNameException e) { + LOG.warn("Could not create object name {}.", mappedBufferPoolName, e); } } private static void instantiateThreadMetrics(MetricGroup metrics) { final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean(); - metrics.gauge("Count", new Gauge<Integer>() { + metrics.<Integer, Gauge<Integer>>gauge("Count", new Gauge<Integer> () { @Override public Integer getValue() { return mxBean.getThreadCount(); @@ -220,54 +246,48 @@ public class MetricUtils { private static void instantiateCPUMetrics(MetricGroup metrics) { try { - final OperatingSystemMXBean mxBean = ManagementFactory.getOperatingSystemMXBean(); - - final Method fetchCPULoadMethod = Class.forName("com.sun.management.OperatingSystemMXBean") - .getMethod("getProcessCpuLoad"); - // verify that we can invoke the method - fetchCPULoadMethod.invoke(mxBean); + final com.sun.management.OperatingSystemMXBean mxBean = (com.sun.management.OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); - final Method fetchCPUTimeMethod = Class.forName("com.sun.management.OperatingSystemMXBean") - .getMethod("getProcessCpuTime"); - // verify that we can invoke the method - fetchCPUTimeMethod.invoke(mxBean); - - metrics.gauge("Load", new Gauge<Double>() { + metrics.<Double, Gauge<Double>>gauge("Load", new Gauge<Double> () { @Override public Double getValue() { - try { - return (Double) fetchCPULoadMethod.invoke(mxBean); - } catch (IllegalAccessException | InvocationTargetException | IllegalArgumentException ignored) { - return -1.0; - } + return mxBean.getProcessCpuLoad(); } }); - metrics.gauge("Time", new Gauge<Long>() { + metrics.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () { @Override public Long getValue() { - try { - return (Long) fetchCPUTimeMethod.invoke(mxBean); - } catch (IllegalAccessException | InvocationTargetException | IllegalArgumentException ignored) { - return -1L; - } + return mxBean.getProcessCpuTime(); } }); - } catch (ClassNotFoundException | InvocationTargetException | SecurityException | NoSuchMethodException | IllegalArgumentException | IllegalAccessException ignored) { + } catch (Exception e) { LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" + - " - CPU load metrics will not be available."); - // make sure that a metric still exists for the given name - metrics.gauge("Load", new Gauge<Double>() { - @Override - public Double getValue() { - return -1.0; - } - }); - metrics.gauge("Time", new Gauge<Long>() { - @Override - public Long getValue() { - return -1L; - } - }); + " - CPU load metrics will not be available.", e); + } + } + + private static final class AttributeGauge<T> implements Gauge<T> { + private final MBeanServer server; + private final ObjectName objectName; + private final String attributeName; + private final T errorValue; + + private AttributeGauge(MBeanServer server, ObjectName objectName, String attributeName, T errorValue) { + this.server = Preconditions.checkNotNull(server); + this.objectName = Preconditions.checkNotNull(objectName); + this.attributeName = Preconditions.checkNotNull(attributeName); + this.errorValue = errorValue; + } + + @SuppressWarnings("unchecked") + @Override + public T getValue() { + try { + return (T) server.getAttribute(objectName, attributeName); + } catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) { + LOG.warn("Could not read attribute {}.", attributeName, e); + return errorValue; + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/7fb7e0b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java deleted file mode 100644 index 1f8d5ed..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskexecutor.utils; - -import com.sun.management.OperatingSystemMXBean; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.taskexecutor.TaskExecutor; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.management.AttributeNotFoundException; -import javax.management.InstanceNotFoundException; -import javax.management.MBeanException; -import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import javax.management.ReflectionException; -import java.lang.management.ClassLoadingMXBean; -import java.lang.management.GarbageCollectorMXBean; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; -import java.lang.management.ThreadMXBean; -import java.util.List; - -/** - * Utility class ot initialize {@link TaskExecutor} specific metrics. - */ -public class TaskExecutorMetricsInitializer { - private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorMetricsInitializer.class); - - public static void instantiateStatusMetrics( - MetricGroup taskManagerMetricGroup, - NetworkEnvironment network) { - MetricGroup status = taskManagerMetricGroup.addGroup("Status"); - - instantiateNetworkMetrics(status.addGroup("Network"), network); - - MetricGroup jvm = status.addGroup("JVM"); - - instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader")); - instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector")); - instantiateMemoryMetrics(jvm.addGroup("Memory")); - instantiateThreadMetrics(jvm.addGroup("Threads")); - instantiateCPUMetrics(jvm.addGroup("CPU")); - } - - private static void instantiateNetworkMetrics( - MetricGroup metrics, - final NetworkEnvironment network) { - metrics.<Long, Gauge<Long>>gauge("TotalMemorySegments", new Gauge<Long> () { - @Override - public Long getValue() { - return (long) network.getNetworkBufferPool().getTotalNumberOfMemorySegments(); - } - }); - - metrics.<Long, Gauge<Long>>gauge("AvailableMemorySegments", new Gauge<Long> () { - @Override - public Long getValue() { - return (long) network.getNetworkBufferPool().getNumberOfAvailableMemorySegments(); - } - }); - } - - private static void instantiateClassLoaderMetrics(MetricGroup metrics) { - final ClassLoadingMXBean mxBean = ManagementFactory.getClassLoadingMXBean(); - - metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", new Gauge<Long> () { - @Override - public Long getValue() { - return mxBean.getTotalLoadedClassCount(); - } - }); - - metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded", new Gauge<Long> () { - @Override - public Long getValue() { - return mxBean.getUnloadedClassCount(); - } - }); - } - - private static void instantiateGarbageCollectorMetrics(MetricGroup metrics) { - List<GarbageCollectorMXBean> garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans(); - - for (final GarbageCollectorMXBean garbageCollector: garbageCollectors) { - MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName()); - - gcGroup.<Long, Gauge<Long>>gauge("Count", new Gauge<Long> () { - @Override - public Long getValue() { - return garbageCollector.getCollectionCount(); - } - }); - - gcGroup.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () { - @Override - public Long getValue() { - return garbageCollector.getCollectionTime(); - } - }); - } - } - - private static void instantiateMemoryMetrics(MetricGroup metrics) { - final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean(); - - MetricGroup heap = metrics.addGroup("Heap"); - - heap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () { - @Override - public Long getValue() { - return mxBean.getHeapMemoryUsage().getUsed(); - } - }); - heap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () { - @Override - public Long getValue() { - return mxBean.getHeapMemoryUsage().getCommitted(); - } - }); - heap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () { - @Override - public Long getValue() { - return mxBean.getHeapMemoryUsage().getMax(); - } - }); - - MetricGroup nonHeap = metrics.addGroup("NonHeap"); - - nonHeap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () { - @Override - public Long getValue() { - return mxBean.getNonHeapMemoryUsage().getUsed(); - } - }); - nonHeap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () { - @Override - public Long getValue() { - return mxBean.getNonHeapMemoryUsage().getCommitted(); - } - }); - nonHeap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () { - @Override - public Long getValue() { - return mxBean.getNonHeapMemoryUsage().getMax(); - } - }); - - final MBeanServer con = ManagementFactory.getPlatformMBeanServer(); - - final String directBufferPoolName = "java.nio:type=BufferPool,name=direct"; - - try { - final ObjectName directObjectName = new ObjectName(directBufferPoolName); - - MetricGroup direct = metrics.addGroup("Direct"); - - direct.<Long, Gauge<Long>>gauge("Count", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "Count", -1L)); - direct.<Long, Gauge<Long>>gauge("MemoryUsed", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "MemoryUsed", -1L)); - direct.<Long, Gauge<Long>>gauge("TotalCapacity", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "TotalCapacity", -1L)); - } catch (MalformedObjectNameException e) { - LOG.warn("Could not create object name {}.", directBufferPoolName, e); - } - - final String mappedBufferPoolName = "java.nio:type=BufferPool,name=mapped"; - - try { - final ObjectName mappedObjectName = new ObjectName(mappedBufferPoolName); - - MetricGroup mapped = metrics.addGroup("Mapped"); - - mapped.<Long, Gauge<Long>>gauge("Count", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "Count", -1L)); - mapped.<Long, Gauge<Long>>gauge("MemoryUsed", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "MemoryUsed", -1L)); - mapped.<Long, Gauge<Long>>gauge("TotalCapacity", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "TotalCapacity", -1L)); - } catch (MalformedObjectNameException e) { - LOG.warn("Could not create object name {}.", mappedBufferPoolName, e); - } - } - - private static void instantiateThreadMetrics(MetricGroup metrics) { - final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean(); - - metrics.<Integer, Gauge<Integer>>gauge("Count", new Gauge<Integer> () { - @Override - public Integer getValue() { - return mxBean.getThreadCount(); - } - }); - } - - private static void instantiateCPUMetrics(MetricGroup metrics) { - try { - final OperatingSystemMXBean mxBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); - - metrics.<Double, Gauge<Double>>gauge("Load", new Gauge<Double> () { - @Override - public Double getValue() { - return mxBean.getProcessCpuLoad(); - } - }); - metrics.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () { - @Override - public Long getValue() { - return mxBean.getProcessCpuTime(); - } - }); - } catch (Exception e) { - LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" + - " - CPU load metrics will not be available.", e); - } - } - - private static final class AttributeGauge<T> implements Gauge<T> { - private final MBeanServer server; - private final ObjectName objectName; - private final String attributeName; - private final T errorValue; - - private AttributeGauge(MBeanServer server, ObjectName objectName, String attributeName, T errorValue) { - this.server = Preconditions.checkNotNull(server); - this.objectName = Preconditions.checkNotNull(objectName); - this.attributeName = Preconditions.checkNotNull(attributeName); - this.errorValue = errorValue; - } - - @SuppressWarnings("unchecked") - @Override - public T getValue() { - try { - return (T) server.getAttribute(objectName, attributeName); - } catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) { - LOG.warn("Could not read attribute {}.", attributeName, e); - return errorValue; - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7fb7e0b9/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index d40a0fd..4fb1196 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1878,7 +1878,6 @@ class JobManager( jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numRunningJobs", new Gauge[Long] { override def getValue: Long = JobManager.this.currentJobs.size }) - MetricUtils.instantiateStatusMetrics(jobManagerMetricGroup) } } @@ -2513,7 +2512,7 @@ object JobManager { } } - val jobManagerMetricGroup = new JobManagerMetricGroup( + val jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup( metricRegistry, configuration.getString(JobManagerOptions.ADDRESS))
