Repository: flink Updated Branches: refs/heads/master 3ce8596b4 -> 5b54009eb
[FLINK-4544[ Refactor JM/TM metrics Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b54009e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b54009e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b54009e Branch: refs/heads/master Commit: 5b54009ebdf1602bdc9860b46ee34e65ef74246a Parents: 3ce8596 Author: zentol <[email protected]> Authored: Fri Oct 28 12:09:56 2016 +0200 Committer: zentol <[email protected]> Committed: Fri Oct 28 13:41:39 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/metrics/util/MetricUtils.java | 245 +++++++++++++++++++ .../flink/runtime/jobmanager/JobManager.scala | 128 +--------- .../flink/runtime/taskmanager/TaskManager.scala | 186 ++------------ 3 files changed, 266 insertions(+), 293 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5b54009e/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java new file mode 100644 index 0000000..64d06ce --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http//www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.util; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.management.BufferPoolMXBean; +import java.lang.management.ClassLoadingMXBean; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.OperatingSystemMXBean; +import java.lang.management.ThreadMXBean; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; + +public class MetricUtils { + private static final Logger LOG = LoggerFactory.getLogger(MetricUtils.class); + private static final String METRIC_GROUP_STATUS_NAME = "Status"; + + private MetricUtils() { + } + + public static void instantiateNetworkMetrics( + MetricGroup metrics, + final NetworkEnvironment network) { + MetricGroup status = metrics.addGroup(METRIC_GROUP_STATUS_NAME); + + status.gauge("TotalMemorySegments", new Gauge<Integer>() { + @Override + public Integer getValue() { + return network.getNetworkBufferPool().getTotalNumberOfMemorySegments(); + } + }); + status.gauge("AvailableMemorySegments", new Gauge<Integer>() { + @Override + public Integer getValue() { + return network.getNetworkBufferPool().getNumberOfAvailableMemorySegments(); + } + }); + } + + public static void instantiateStatusMetrics( + MetricGroup metrics) { + MetricGroup status = metrics + .addGroup(METRIC_GROUP_STATUS_NAME); + + MetricGroup jvm = status + .addGroup("JVM"); + + instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader")); + instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector")); + instantiateMemoryMetrics(jvm.addGroup("Memory")); + instantiateThreadMetrics(jvm.addGroup("Threads")); + instantiateCPUMetrics(jvm.addGroup("CPU")); + } + + private static void instantiateClassLoaderMetrics(MetricGroup metrics) { + final ClassLoadingMXBean mxBean = ManagementFactory.getClassLoadingMXBean(); + + metrics.gauge("ClassesLoaded", new Gauge<Long>() { + @Override + public Long getValue() { + return mxBean.getTotalLoadedClassCount(); + } + }); + metrics.gauge("ClassesUnloaded", new Gauge<Long>() { + @Override + public Long getValue() { + return mxBean.getUnloadedClassCount(); + } + }); + } + + private static void instantiateGarbageCollectorMetrics(MetricGroup metrics) { + List<GarbageCollectorMXBean> garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans(); + + for (final GarbageCollectorMXBean garbageCollector : garbageCollectors) { + MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName()); + gcGroup.gauge("Count", new Gauge<Long>() { + @Override + public Long getValue() { + return garbageCollector.getCollectionCount(); + } + }); + gcGroup.gauge("Time", new Gauge<Long>() { + @Override + public Long getValue() { + return garbageCollector.getCollectionTime(); + } + }); + } + } + + private static void instantiateMemoryMetrics(MetricGroup metrics) { + final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean(); + MetricGroup heap = metrics.addGroup("Heap"); + heap.gauge("Used", new Gauge<Long>() { + @Override + public Long getValue() { + return mxBean.getHeapMemoryUsage().getUsed(); + } + }); + heap.gauge("Committed", new Gauge<Long>() { + @Override + public Long getValue() { + return mxBean.getHeapMemoryUsage().getCommitted(); + } + }); + heap.gauge("Max", new Gauge<Long>() { + @Override + public Long getValue() { + return mxBean.getHeapMemoryUsage().getMax(); + } + }); + + MetricGroup nonHeap = metrics.addGroup("NonHeap"); + nonHeap.gauge("Used", new Gauge<Long>() { + @Override + public Long getValue() { + return mxBean.getNonHeapMemoryUsage().getUsed(); + } + }); + nonHeap.gauge("Committed", new Gauge<Long>() { + @Override + public Long getValue() { + return mxBean.getNonHeapMemoryUsage().getCommitted(); + } + }); + nonHeap.gauge("Max", new Gauge<Long>() { + @Override + public Long getValue() { + return mxBean.getNonHeapMemoryUsage().getMax(); + } + }); + + List<BufferPoolMXBean> bufferMxBeans = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class); + + for (final BufferPoolMXBean bufferMxBean : bufferMxBeans) { + MetricGroup bufferGroup = metrics.addGroup(bufferMxBean.getName()); + bufferGroup.gauge("Count", new Gauge<Long>() { + @Override + public Long getValue() { + return bufferMxBean.getCount(); + } + }); + bufferGroup.gauge("MemoryUsed", new Gauge<Long>() { + @Override + public Long getValue() { + return bufferMxBean.getMemoryUsed(); + } + }); + bufferGroup.gauge("TotalCapacity", new Gauge<Long>() { + @Override + public Long getValue() { + return bufferMxBean.getTotalCapacity(); + } + }); + } + } + + private static void instantiateThreadMetrics(MetricGroup metrics) { + final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean(); + + metrics.gauge("Count", new Gauge<Integer>() { + @Override + public Integer getValue() { + return mxBean.getThreadCount(); + } + }); + } + + private static void instantiateCPUMetrics(MetricGroup metrics) { + try { + final OperatingSystemMXBean mxBean = ManagementFactory.getOperatingSystemMXBean(); + + final Method fetchCPULoadMethod = Class.forName("com.sun.management.OperatingSystemMXBean") + .getMethod("getProcessCpuLoad"); + // verify that we can invoke the method + fetchCPULoadMethod.invoke(mxBean); + + final Method fetchCPUTimeMethod = Class.forName("com.sun.management.OperatingSystemMXBean") + .getMethod("getProcessCpuTime"); + // verify that we can invoke the method + fetchCPUTimeMethod.invoke(mxBean); + + metrics.gauge("Load", new Gauge<Double>() { + @Override + public Double getValue() { + try { + return (Double) fetchCPULoadMethod.invoke(mxBean); + } catch (IllegalAccessException | InvocationTargetException | IllegalArgumentException ignored) { + return -1.0; + } + } + }); + metrics.gauge("Time", new Gauge<Long>() { + @Override + public Long getValue() { + try { + return (Long) fetchCPUTimeMethod.invoke(mxBean); + } catch (IllegalAccessException | InvocationTargetException | IllegalArgumentException ignored) { + return -1L; + } + } + }); + } catch (ClassNotFoundException | InvocationTargetException | SecurityException | NoSuchMethodException | IllegalArgumentException | IllegalAccessException ignored) { + LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" + + " - CPU load metrics will not be available."); + // make sure that a metric still exists for the given name + metrics.gauge("Load", new Gauge<Double>() { + @Override + public Double getValue() { + return -1.0; + } + }); + metrics.gauge("Time", new Gauge<Long>() { + @Override + public Long getValue() { + return -1L; + } + }); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5b54009e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 8ea053e..516bbbe 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -19,11 +19,9 @@ package org.apache.flink.runtime.jobmanager import java.io.{File, IOException} -import java.lang.management.ManagementFactory import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, UnknownHostException} import java.util.UUID import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException} -import javax.management.ObjectName import akka.actor.Status.{Failure, Success} import akka.actor._ @@ -69,6 +67,7 @@ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _} import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.runtime.metrics.util.MetricUtils import org.apache.flink.runtime.process.ProcessReaper import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered} import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation} @@ -1842,130 +1841,7 @@ class JobManager( jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numRunningJobs", new Gauge[Long] { override def getValue: Long = JobManager.this.currentJobs.size }) - instantiateStatusMetrics(jobManagerMetricGroup) - } - - private def instantiateStatusMetrics(jobManagerMetricGroup: MetricGroup) : Unit = { - val jvm = jobManagerMetricGroup - .addGroup("Status") - .addGroup("JVM") - - instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader")) - instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector")) - instantiateMemoryMetrics(jvm.addGroup("Memory")) - instantiateThreadMetrics(jvm.addGroup("Threads")) - instantiateCPUMetrics(jvm.addGroup("CPU")) - } - - private def instantiateClassLoaderMetrics(metrics: MetricGroup) { - val mxBean = ManagementFactory.getClassLoadingMXBean - - metrics.gauge[Long, Gauge[Long]]("ClassesLoaded", new Gauge[Long] { - override def getValue: Long = mxBean.getTotalLoadedClassCount - }) - metrics.gauge[Long, Gauge[Long]]("ClassesUnloaded", new Gauge[Long] { - override def getValue: Long = mxBean.getUnloadedClassCount - }) - } - - private def instantiateGarbageCollectorMetrics(metrics: MetricGroup) { - val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans - - for (garbageCollector <- garbageCollectors.asScala) { - val gcGroup = metrics.addGroup(garbageCollector.getName) - gcGroup.gauge[Long, Gauge[Long]]("Count", new Gauge[Long] { - override def getValue: Long = garbageCollector.getCollectionCount - }) - gcGroup.gauge[Long, Gauge[Long]]("Time", new Gauge[Long] { - override def getValue: Long = garbageCollector.getCollectionTime - }) - } - } - - private def instantiateMemoryMetrics(metrics: MetricGroup) { - val mxBean = ManagementFactory.getMemoryMXBean - val heap = metrics.addGroup("Heap") - heap.gauge[Long, Gauge[Long]]("Used", new Gauge[Long] { - override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed - }) - heap.gauge[Long, Gauge[Long]]("Committed", new Gauge[Long] { - override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted - }) - heap.gauge[Long, Gauge[Long]]("Max", new Gauge[Long] { - override def getValue: Long = mxBean.getHeapMemoryUsage.getMax - }) - - val nonHeap = metrics.addGroup("NonHeap") - nonHeap.gauge[Long, Gauge[Long]]("Used", new Gauge[Long] { - override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed - }) - nonHeap.gauge[Long, Gauge[Long]]("Committed", new Gauge[Long] { - override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted - }) - nonHeap.gauge[Long, Gauge[Long]]("Max", new Gauge[Long] { - override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax - }) - - val con = ManagementFactory.getPlatformMBeanServer; - - val directObjectName = new ObjectName("java.nio:type=BufferPool,name=direct") - - val direct = metrics.addGroup("Direct") - direct.gauge[Long, Gauge[Long]]("Count", new Gauge[Long] { - override def getValue: Long = con - .getAttribute(directObjectName, "Count").asInstanceOf[Long] - }) - direct.gauge[Long, Gauge[Long]]("MemoryUsed", new Gauge[Long] { - override def getValue: Long = con - .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long] - }) - direct.gauge[Long, Gauge[Long]]("TotalCapacity", new Gauge[Long] { - override def getValue: Long = con - .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long] - }) - - val mappedObjectName = new ObjectName("java.nio:type=BufferPool,name=mapped") - - val mapped = metrics.addGroup("Mapped") - mapped.gauge[Long, Gauge[Long]]("Count", new Gauge[Long] { - override def getValue: Long = con - .getAttribute(mappedObjectName, "Count").asInstanceOf[Long] - }) - mapped.gauge[Long, Gauge[Long]]("MemoryUsed", new Gauge[Long] { - override def getValue: Long = con - .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long] - }) - mapped.gauge[Long, Gauge[Long]]("TotalCapacity", new Gauge[Long] { - override def getValue: Long = con - .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long] - }) - } - - private def instantiateThreadMetrics(metrics: MetricGroup): Unit = { - val mxBean = ManagementFactory.getThreadMXBean - - metrics.gauge[Int, Gauge[Int]]("Count", new Gauge[Int] { - override def getValue: Int = mxBean.getThreadCount - }) - } - - private def instantiateCPUMetrics(metrics: MetricGroup): Unit = { - try { - val mxBean = ManagementFactory.getOperatingSystemMXBean - .asInstanceOf[com.sun.management.OperatingSystemMXBean] - - metrics.gauge[Double, Gauge[Double]]("Load", new Gauge[Double] { - override def getValue: Double = mxBean.getProcessCpuLoad - }) - metrics.gauge[Long, Gauge[Long]]("Time", new Gauge[Long] { - override def getValue: Long = mxBean.getProcessCpuTime - }) - } - catch { - case t: Throwable => - log.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" + - " - CPU load metrics will not be available.") - } + MetricUtils.instantiateStatusMetrics(jobManagerMetricGroup) } } http://git-wip-us.apache.org/repos/asf/flink/blob/5b54009e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 8b597d0..9727860 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -20,12 +20,10 @@ package org.apache.flink.runtime.taskmanager import java.io.{File, FileInputStream, IOException} import java.lang.management.{ManagementFactory, OperatingSystemMXBean} -import java.lang.reflect.Method import java.net.{InetAddress, InetSocketAddress} import java.util import java.util.UUID import java.util.concurrent.TimeUnit -import javax.management.ObjectName import _root_.akka.actor._ import _root_.akka.pattern.ask @@ -39,7 +37,6 @@ import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.flink.configuration._ import org.apache.flink.core.fs.FileSystem import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType} -import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge} import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.clusterframework.messages.StopCluster import org.apache.flink.runtime.clusterframework.types.ResourceID @@ -69,6 +66,7 @@ import org.apache.flink.runtime.messages.TaskMessages._ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint} import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry} import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup +import org.apache.flink.runtime.metrics.util.MetricUtils import org.apache.flink.runtime.process.ProcessReaper import org.apache.flink.runtime.query.KvStateRegistry import org.apache.flink.runtime.query.netty.{DisabledKvStateRequestStats, KvStateServer} @@ -998,7 +996,8 @@ class TaskManager( taskManagerMetricGroup = new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString) - TaskManager.instantiateStatusMetrics(taskManagerMetricGroup, network) + MetricUtils.instantiateStatusMetrics(taskManagerMetricGroup) + MetricUtils.instantiateNetworkMetrics(taskManagerMetricGroup, network) // watch job manager to detect when it dies context.watch(jobManager) @@ -2502,176 +2501,29 @@ object TaskManager { ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage() }) - // Pre-processing steps for registering cpuLoad val osBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean() - - val fetchCPULoadMethod: Option[Method] = - try { - Class.forName("com.sun.management.OperatingSystemMXBean") - .getMethods() - .find( _.getName() == "getProcessCpuLoad" ) - } - catch { - case t: Throwable => - LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" + - " - CPU load metrics will not be available.") - None - } - - metricRegistry.register("cpuLoad", new Gauge[Double] { - override def getValue: Double = { - try{ - fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0) - } - catch { - case t: Throwable => - LOG.warn("Error retrieving CPU Load through OperatingSystemMXBean", t) - -1.0 - } - } - }) - metricRegistry - } - - private def instantiateStatusMetrics( - taskManagerMetricGroup: MetricGroup, - network: NetworkEnvironment) - : Unit = { - val status = taskManagerMetricGroup - .addGroup("Status") - - instantiateNetworkMetrics(status.addGroup("Network"), network) - val jvm = status - .addGroup("JVM") - - instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader")) - instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector")) - instantiateMemoryMetrics(jvm.addGroup("Memory")) - instantiateThreadMetrics(jvm.addGroup("Threads")) - instantiateCPUMetrics(jvm.addGroup("CPU")) - } - - private def instantiateNetworkMetrics( - metrics: MetricGroup, - network: NetworkEnvironment) - : Unit = { - metrics.gauge[Long, FlinkGauge[Long]]("TotalMemorySegments", new FlinkGauge[Long] { - override def getValue: Long = network.getNetworkBufferPool.getTotalNumberOfMemorySegments - }) - metrics.gauge[Long, FlinkGauge[Long]]("AvailableMemorySegments", new FlinkGauge[Long] { - override def getValue: Long = network.getNetworkBufferPool.getNumberOfAvailableMemorySegments - }) - } - - private def instantiateClassLoaderMetrics(metrics: MetricGroup) { - val mxBean = ManagementFactory.getClassLoadingMXBean - - metrics.gauge[Long, FlinkGauge[Long]]("ClassesLoaded", new FlinkGauge[Long] { - override def getValue: Long = mxBean.getTotalLoadedClassCount - }) - metrics.gauge[Long, FlinkGauge[Long]]("ClassesUnloaded", new FlinkGauge[Long] { - override def getValue: Long = mxBean.getUnloadedClassCount - }) - } + try { + val fetchCPULoadMethod = Class.forName("com.sun.management.OperatingSystemMXBean") + .getMethods() + .find( _.getName() == "getProcessCpuLoad" ) - private def instantiateGarbageCollectorMetrics(metrics: MetricGroup) { - val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans + // verify that we can invoke the method + fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0) - for (garbageCollector <- garbageCollectors.asScala) { - val gcGroup = metrics.addGroup(garbageCollector.getName) - gcGroup.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] { - override def getValue: Long = garbageCollector.getCollectionCount - }) - gcGroup.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] { - override def getValue: Long = garbageCollector.getCollectionTime + metricRegistry.register("cpuLoad", new Gauge[Double] { + override def getValue: Double = fetchCPULoadMethod + .map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0) }) } - } - - private def instantiateMemoryMetrics(metrics: MetricGroup) { - val mxBean = ManagementFactory.getMemoryMXBean - val heap = metrics.addGroup("Heap") - heap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] { - override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed - }) - heap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] { - override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted - }) - heap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] { - override def getValue: Long = mxBean.getHeapMemoryUsage.getMax - }) - - val nonHeap = metrics.addGroup("NonHeap") - nonHeap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] { - override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed - }) - nonHeap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] { - override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted - }) - nonHeap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] { - override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax - }) - - val con = ManagementFactory.getPlatformMBeanServer; - - val directObjectName = new ObjectName("java.nio:type=BufferPool,name=direct") - - val direct = metrics.addGroup("Direct") - direct.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] { - override def getValue: Long = con - .getAttribute(directObjectName, "Count").asInstanceOf[Long] - }) - direct.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] { - override def getValue: Long = con - .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long] - }) - direct.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] { - override def getValue: Long = con - .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long] - }) - - val mappedObjectName = new ObjectName("java.nio:type=BufferPool,name=mapped") - - val mapped = metrics.addGroup("Mapped") - mapped.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] { - override def getValue: Long = con - .getAttribute(mappedObjectName, "Count").asInstanceOf[Long] - }) - mapped.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] { - override def getValue: Long = con - .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long] - }) - mapped.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] { - override def getValue: Long = con - .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long] - }) - } - - private def instantiateThreadMetrics(metrics: MetricGroup): Unit = { - val mxBean = ManagementFactory.getThreadMXBean - - metrics.gauge[Int, FlinkGauge[Int]]("Count", new FlinkGauge[Int] { - override def getValue: Int = mxBean.getThreadCount - }) - } - - private def instantiateCPUMetrics(metrics: MetricGroup): Unit = { - try { - val mxBean = ManagementFactory.getOperatingSystemMXBean - .asInstanceOf[com.sun.management.OperatingSystemMXBean] - - metrics.gauge[Double, FlinkGauge[Double]]("Load", new FlinkGauge[Double] { - override def getValue: Double = mxBean.getProcessCpuLoad - }) - metrics.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] { - override def getValue: Long = mxBean.getProcessCpuTime - }) - } catch { - case t: Throwable => - LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" + - " - CPU load metrics will not be available.") + case t: Throwable => + LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" + + " - CPU load metrics will not be available.") + metricRegistry.register("cpuLoad", new Gauge[Double] { + override def getValue: Double = -1.0 + }) } + metricRegistry } }
