Repository: flink
Updated Branches:
  refs/heads/master 3ce8596b4 -> 5b54009eb


[FLINK-4544[ Refactor JM/TM metrics


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b54009e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b54009e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b54009e

Branch: refs/heads/master
Commit: 5b54009ebdf1602bdc9860b46ee34e65ef74246a
Parents: 3ce8596
Author: zentol <[email protected]>
Authored: Fri Oct 28 12:09:56 2016 +0200
Committer: zentol <[email protected]>
Committed: Fri Oct 28 13:41:39 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/metrics/util/MetricUtils.java | 245 +++++++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   | 128 +---------
 .../flink/runtime/taskmanager/TaskManager.scala | 186 ++------------
 3 files changed, 266 insertions(+), 293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5b54009e/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
new file mode 100644
index 0000000..64d06ce
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http//www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.BufferPoolMXBean;
+import java.lang.management.ClassLoadingMXBean;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.management.ThreadMXBean;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+
+public class MetricUtils {
+       private static final Logger LOG = 
LoggerFactory.getLogger(MetricUtils.class);
+       private static final String METRIC_GROUP_STATUS_NAME = "Status";
+
+       private MetricUtils() {
+       }
+
+       public static void instantiateNetworkMetrics(
+               MetricGroup metrics,
+               final NetworkEnvironment network) {
+               MetricGroup status = metrics.addGroup(METRIC_GROUP_STATUS_NAME);
+
+               status.gauge("TotalMemorySegments", new Gauge<Integer>() {
+                       @Override
+                       public Integer getValue() {
+                               return 
network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
+                       }
+               });
+               status.gauge("AvailableMemorySegments", new Gauge<Integer>() {
+                       @Override
+                       public Integer getValue() {
+                               return 
network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
+                       }
+               });
+       }
+
+       public static void instantiateStatusMetrics(
+               MetricGroup metrics) {
+               MetricGroup status = metrics
+                       .addGroup(METRIC_GROUP_STATUS_NAME);
+
+               MetricGroup jvm = status
+                       .addGroup("JVM");
+
+               instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
+               
instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
+               instantiateMemoryMetrics(jvm.addGroup("Memory"));
+               instantiateThreadMetrics(jvm.addGroup("Threads"));
+               instantiateCPUMetrics(jvm.addGroup("CPU"));
+       }
+
+       private static void instantiateClassLoaderMetrics(MetricGroup metrics) {
+               final ClassLoadingMXBean mxBean = 
ManagementFactory.getClassLoadingMXBean();
+
+               metrics.gauge("ClassesLoaded", new Gauge<Long>() {
+                       @Override
+                       public Long getValue() {
+                               return mxBean.getTotalLoadedClassCount();
+                       }
+               });
+               metrics.gauge("ClassesUnloaded", new Gauge<Long>() {
+                       @Override
+                       public Long getValue() {
+                               return mxBean.getUnloadedClassCount();
+                       }
+               });
+       }
+
+       private static void instantiateGarbageCollectorMetrics(MetricGroup 
metrics) {
+               List<GarbageCollectorMXBean> garbageCollectors = 
ManagementFactory.getGarbageCollectorMXBeans();
+
+               for (final GarbageCollectorMXBean garbageCollector : 
garbageCollectors) {
+                       MetricGroup gcGroup = 
metrics.addGroup(garbageCollector.getName());
+                       gcGroup.gauge("Count", new Gauge<Long>() {
+                               @Override
+                               public Long getValue() {
+                                       return 
garbageCollector.getCollectionCount();
+                               }
+                       });
+                       gcGroup.gauge("Time", new Gauge<Long>() {
+                               @Override
+                               public Long getValue() {
+                                       return 
garbageCollector.getCollectionTime();
+                               }
+                       });
+               }
+       }
+
+       private static void instantiateMemoryMetrics(MetricGroup metrics) {
+               final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
+               MetricGroup heap = metrics.addGroup("Heap");
+               heap.gauge("Used", new Gauge<Long>() {
+                       @Override
+                       public Long getValue() {
+                               return mxBean.getHeapMemoryUsage().getUsed();
+                       }
+               });
+               heap.gauge("Committed", new Gauge<Long>() {
+                       @Override
+                       public Long getValue() {
+                               return 
mxBean.getHeapMemoryUsage().getCommitted();
+                       }
+               });
+               heap.gauge("Max", new Gauge<Long>() {
+                       @Override
+                       public Long getValue() {
+                               return mxBean.getHeapMemoryUsage().getMax();
+                       }
+               });
+
+               MetricGroup nonHeap = metrics.addGroup("NonHeap");
+               nonHeap.gauge("Used", new Gauge<Long>() {
+                       @Override
+                       public Long getValue() {
+                               return mxBean.getNonHeapMemoryUsage().getUsed();
+                       }
+               });
+               nonHeap.gauge("Committed", new Gauge<Long>() {
+                       @Override
+                       public Long getValue() {
+                               return 
mxBean.getNonHeapMemoryUsage().getCommitted();
+                       }
+               });
+               nonHeap.gauge("Max", new Gauge<Long>() {
+                       @Override
+                       public Long getValue() {
+                               return mxBean.getNonHeapMemoryUsage().getMax();
+                       }
+               });
+
+               List<BufferPoolMXBean> bufferMxBeans = 
ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
+
+               for (final BufferPoolMXBean bufferMxBean : bufferMxBeans) {
+                       MetricGroup bufferGroup = 
metrics.addGroup(bufferMxBean.getName());
+                       bufferGroup.gauge("Count", new Gauge<Long>() {
+                               @Override
+                               public Long getValue() {
+                                       return bufferMxBean.getCount();
+                               }
+                       });
+                       bufferGroup.gauge("MemoryUsed", new Gauge<Long>() {
+                               @Override
+                               public Long getValue() {
+                                       return bufferMxBean.getMemoryUsed();
+                               }
+                       });
+                       bufferGroup.gauge("TotalCapacity", new Gauge<Long>() {
+                               @Override
+                               public Long getValue() {
+                                       return bufferMxBean.getTotalCapacity();
+                               }
+                       });
+               }
+       }
+
+       private static void instantiateThreadMetrics(MetricGroup metrics) {
+               final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
+
+               metrics.gauge("Count", new Gauge<Integer>() {
+                       @Override
+                       public Integer getValue() {
+                               return mxBean.getThreadCount();
+                       }
+               });
+       }
+
+       private static void instantiateCPUMetrics(MetricGroup metrics) {
+               try {
+                       final OperatingSystemMXBean mxBean = 
ManagementFactory.getOperatingSystemMXBean();
+
+                       final Method fetchCPULoadMethod = 
Class.forName("com.sun.management.OperatingSystemMXBean")
+                               .getMethod("getProcessCpuLoad");
+                       // verify that we can invoke the method
+                       fetchCPULoadMethod.invoke(mxBean);
+
+                       final Method fetchCPUTimeMethod = 
Class.forName("com.sun.management.OperatingSystemMXBean")
+                               .getMethod("getProcessCpuTime");
+                       // verify that we can invoke the method
+                       fetchCPUTimeMethod.invoke(mxBean);
+
+                       metrics.gauge("Load", new Gauge<Double>() {
+                               @Override
+                               public Double getValue() {
+                                       try {
+                                               return (Double) 
fetchCPULoadMethod.invoke(mxBean);
+                                       } catch (IllegalAccessException | 
InvocationTargetException | IllegalArgumentException ignored) {
+                                               return -1.0;
+                                       }
+                               }
+                       });
+                       metrics.gauge("Time", new Gauge<Long>() {
+                               @Override
+                               public Long getValue() {
+                                       try {
+                                               return (Long) 
fetchCPUTimeMethod.invoke(mxBean);
+                                       } catch (IllegalAccessException | 
InvocationTargetException | IllegalArgumentException ignored) {
+                                               return -1L;
+                                       }
+                               }
+                       });
+               } catch (ClassNotFoundException | InvocationTargetException | 
SecurityException | NoSuchMethodException | IllegalArgumentException | 
IllegalAccessException ignored) {
+                       LOG.warn("Cannot access 
com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
+                               " - CPU load metrics will not be available.");
+                       // make sure that a metric still exists for the given 
name
+                       metrics.gauge("Load", new Gauge<Double>() {
+                               @Override
+                               public Double getValue() {
+                                       return -1.0;
+                               }
+                       });
+                       metrics.gauge("Time", new Gauge<Long>() {
+                               @Override
+                               public Long getValue() {
+                                       return -1L;
+                               }
+                       });
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b54009e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8ea053e..516bbbe 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -19,11 +19,9 @@
 package org.apache.flink.runtime.jobmanager
 
 import java.io.{File, IOException}
-import java.lang.management.ManagementFactory
 import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, 
UnknownHostException}
 import java.util.UUID
 import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
-import javax.management.ObjectName
 
 import akka.actor.Status.{Failure, Success}
 import akka.actor._
@@ -69,6 +67,7 @@ import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage,
 import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, 
MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.util.MetricUtils
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, 
NotifyKvStateRegistered, NotifyKvStateUnregistered}
 import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
@@ -1842,130 +1841,7 @@ class JobManager(
     jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numRunningJobs", new 
Gauge[Long] {
       override def getValue: Long = JobManager.this.currentJobs.size
     })
-    instantiateStatusMetrics(jobManagerMetricGroup)
-  }
-
-  private def instantiateStatusMetrics(jobManagerMetricGroup: MetricGroup) : 
Unit = {
-    val jvm = jobManagerMetricGroup
-      .addGroup("Status")
-      .addGroup("JVM")
-
-    instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"))
-    instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"))
-    instantiateMemoryMetrics(jvm.addGroup("Memory"))
-    instantiateThreadMetrics(jvm.addGroup("Threads"))
-    instantiateCPUMetrics(jvm.addGroup("CPU"))
-  }
-
-  private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
-    val mxBean = ManagementFactory.getClassLoadingMXBean
-
-    metrics.gauge[Long, Gauge[Long]]("ClassesLoaded", new Gauge[Long] {
-      override def getValue: Long = mxBean.getTotalLoadedClassCount
-    })
-    metrics.gauge[Long, Gauge[Long]]("ClassesUnloaded", new Gauge[Long] {
-      override def getValue: Long = mxBean.getUnloadedClassCount
-    })
-  }
-
-  private def instantiateGarbageCollectorMetrics(metrics: MetricGroup) {
-    val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans
-
-    for (garbageCollector <- garbageCollectors.asScala) {
-      val gcGroup = metrics.addGroup(garbageCollector.getName)
-      gcGroup.gauge[Long, Gauge[Long]]("Count", new Gauge[Long] {
-        override def getValue: Long = garbageCollector.getCollectionCount
-      })
-      gcGroup.gauge[Long, Gauge[Long]]("Time", new Gauge[Long] {
-        override def getValue: Long = garbageCollector.getCollectionTime
-      })
-    }
-  }
-
-  private def instantiateMemoryMetrics(metrics: MetricGroup) {
-    val mxBean = ManagementFactory.getMemoryMXBean
-    val heap = metrics.addGroup("Heap")
-    heap.gauge[Long, Gauge[Long]]("Used", new Gauge[Long] {
-      override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed
-    })
-    heap.gauge[Long, Gauge[Long]]("Committed", new Gauge[Long] {
-      override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted
-    })
-    heap.gauge[Long, Gauge[Long]]("Max", new Gauge[Long] {
-      override def getValue: Long = mxBean.getHeapMemoryUsage.getMax
-    })
-
-    val nonHeap = metrics.addGroup("NonHeap")
-    nonHeap.gauge[Long, Gauge[Long]]("Used", new Gauge[Long] {
-      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed
-    })
-    nonHeap.gauge[Long, Gauge[Long]]("Committed", new Gauge[Long] {
-      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted
-    })
-    nonHeap.gauge[Long, Gauge[Long]]("Max", new Gauge[Long] {
-      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax
-    })
-
-    val con = ManagementFactory.getPlatformMBeanServer;
-
-    val directObjectName = new 
ObjectName("java.nio:type=BufferPool,name=direct")
-
-    val direct = metrics.addGroup("Direct")
-    direct.gauge[Long, Gauge[Long]]("Count", new Gauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(directObjectName, "Count").asInstanceOf[Long]
-    })
-    direct.gauge[Long, Gauge[Long]]("MemoryUsed", new Gauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long]
-    })
-    direct.gauge[Long, Gauge[Long]]("TotalCapacity", new Gauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long]
-    })
-
-    val mappedObjectName = new 
ObjectName("java.nio:type=BufferPool,name=mapped")
-
-    val mapped = metrics.addGroup("Mapped")
-    mapped.gauge[Long, Gauge[Long]]("Count", new Gauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(mappedObjectName, "Count").asInstanceOf[Long]
-    })
-    mapped.gauge[Long, Gauge[Long]]("MemoryUsed", new Gauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long]
-    })
-    mapped.gauge[Long, Gauge[Long]]("TotalCapacity", new Gauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long]
-    })
-  }
-
-  private def instantiateThreadMetrics(metrics: MetricGroup): Unit = {
-    val mxBean = ManagementFactory.getThreadMXBean
-
-    metrics.gauge[Int, Gauge[Int]]("Count", new Gauge[Int] {
-      override def getValue: Int = mxBean.getThreadCount
-    })
-  }
-
-  private def instantiateCPUMetrics(metrics: MetricGroup): Unit = {
-    try {
-      val mxBean = ManagementFactory.getOperatingSystemMXBean
-        .asInstanceOf[com.sun.management.OperatingSystemMXBean]
-
-      metrics.gauge[Double, Gauge[Double]]("Load", new Gauge[Double] {
-        override def getValue: Double = mxBean.getProcessCpuLoad
-      })
-      metrics.gauge[Long, Gauge[Long]]("Time", new Gauge[Long] {
-        override def getValue: Long = mxBean.getProcessCpuTime
-      })
-    }
-    catch {
-      case t: Throwable =>
-        log.warn("Cannot access 
com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
-          " - CPU load metrics will not be available.")
-    }
+    MetricUtils.instantiateStatusMetrics(jobManagerMetricGroup)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5b54009e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8b597d0..9727860 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -20,12 +20,10 @@ package org.apache.flink.runtime.taskmanager
 
 import java.io.{File, FileInputStream, IOException}
 import java.lang.management.{ManagementFactory, OperatingSystemMXBean}
-import java.lang.reflect.Method
 import java.net.{InetAddress, InetSocketAddress}
 import java.util
 import java.util.UUID
 import java.util.concurrent.TimeUnit
-import javax.management.ObjectName
 
 import _root_.akka.actor._
 import _root_.akka.pattern.ask
@@ -39,7 +37,6 @@ import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, 
MemorySegmentFactory, MemoryType}
-import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.clusterframework.types.ResourceID
@@ -69,6 +66,7 @@ import org.apache.flink.runtime.messages.TaskMessages._
 import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, 
NotifyCheckpointComplete, TriggerCheckpoint}
 import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, 
MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
+import org.apache.flink.runtime.metrics.util.MetricUtils
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.query.KvStateRegistry
 import org.apache.flink.runtime.query.netty.{DisabledKvStateRequestStats, 
KvStateServer}
@@ -998,7 +996,8 @@ class TaskManager(
     taskManagerMetricGroup = 
       new TaskManagerMetricGroup(metricsRegistry, 
this.runtimeInfo.getHostname, id.toString)
     
-    TaskManager.instantiateStatusMetrics(taskManagerMetricGroup, network)
+    MetricUtils.instantiateStatusMetrics(taskManagerMetricGroup)
+    MetricUtils.instantiateNetworkMetrics(taskManagerMetricGroup, network)
     
     // watch job manager to detect when it dies
     context.watch(jobManager)
@@ -2502,176 +2501,29 @@ object TaskManager {
         ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage()
     })
 
-    // Pre-processing steps for registering cpuLoad
     val osBean: OperatingSystemMXBean = 
ManagementFactory.getOperatingSystemMXBean()
-        
-    val fetchCPULoadMethod: Option[Method] = 
-      try {
-        Class.forName("com.sun.management.OperatingSystemMXBean")
-          .getMethods()
-          .find( _.getName() == "getProcessCpuLoad" )
-      }
-      catch {
-        case t: Throwable =>
-          LOG.warn("Cannot access 
com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
-            " - CPU load metrics will not be available.")
-          None
-      }
-
-    metricRegistry.register("cpuLoad", new Gauge[Double] {
-      override def getValue: Double = {
-        try{
-          
fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0)
-        }
-        catch {
-          case t: Throwable =>
-            LOG.warn("Error retrieving CPU Load through 
OperatingSystemMXBean", t)
-            -1.0
-        }
-      }
-    })
-    metricRegistry
-  }
-
-  private def instantiateStatusMetrics(
-      taskManagerMetricGroup: MetricGroup,
-      network: NetworkEnvironment)
-    : Unit = {
-    val status = taskManagerMetricGroup
-      .addGroup("Status")
-
-    instantiateNetworkMetrics(status.addGroup("Network"), network)
 
-    val jvm = status
-      .addGroup("JVM")
-
-    instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"))
-    instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"))
-    instantiateMemoryMetrics(jvm.addGroup("Memory"))
-    instantiateThreadMetrics(jvm.addGroup("Threads"))
-    instantiateCPUMetrics(jvm.addGroup("CPU"))
-  }
-
-  private def instantiateNetworkMetrics(
-        metrics: MetricGroup,
-        network: NetworkEnvironment)
-    : Unit = {
-    metrics.gauge[Long, FlinkGauge[Long]]("TotalMemorySegments", new 
FlinkGauge[Long] {
-      override def getValue: Long = 
network.getNetworkBufferPool.getTotalNumberOfMemorySegments
-    })
-    metrics.gauge[Long, FlinkGauge[Long]]("AvailableMemorySegments", new 
FlinkGauge[Long] {
-      override def getValue: Long = 
network.getNetworkBufferPool.getNumberOfAvailableMemorySegments
-    })
-  }
-
-  private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
-    val mxBean = ManagementFactory.getClassLoadingMXBean
-
-    metrics.gauge[Long, FlinkGauge[Long]]("ClassesLoaded", new 
FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getTotalLoadedClassCount
-    })
-    metrics.gauge[Long, FlinkGauge[Long]]("ClassesUnloaded", new 
FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getUnloadedClassCount
-    })
-  }
+    try {
+      val fetchCPULoadMethod = 
Class.forName("com.sun.management.OperatingSystemMXBean")
+        .getMethods()
+        .find( _.getName() == "getProcessCpuLoad" )
 
-  private def instantiateGarbageCollectorMetrics(metrics: MetricGroup) {
-    val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans
+      // verify that we can invoke the method
+      
fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0)
 
-    for (garbageCollector <- garbageCollectors.asScala) {
-      val gcGroup = metrics.addGroup(garbageCollector.getName)
-      gcGroup.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
-        override def getValue: Long = garbageCollector.getCollectionCount
-      })
-      gcGroup.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
-        override def getValue: Long = garbageCollector.getCollectionTime
+      metricRegistry.register("cpuLoad", new Gauge[Double] {
+        override def getValue: Double = fetchCPULoadMethod
+          .map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0)
       })
     }
-  }
-
-  private def instantiateMemoryMetrics(metrics: MetricGroup) {
-    val mxBean = ManagementFactory.getMemoryMXBean
-    val heap = metrics.addGroup("Heap")
-    heap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed
-    })
-    heap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted
-    })
-    heap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getHeapMemoryUsage.getMax
-    })
-
-    val nonHeap = metrics.addGroup("NonHeap")
-    nonHeap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed
-    })
-    nonHeap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted
-    })
-    nonHeap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax
-    })
-
-    val con = ManagementFactory.getPlatformMBeanServer;
-
-    val directObjectName = new 
ObjectName("java.nio:type=BufferPool,name=direct")
-
-    val direct = metrics.addGroup("Direct")
-    direct.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(directObjectName, "Count").asInstanceOf[Long]
-    })
-    direct.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long]
-    })
-    direct.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] 
{
-      override def getValue: Long = con
-        .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long]
-    })
-
-    val mappedObjectName = new 
ObjectName("java.nio:type=BufferPool,name=mapped")
-
-    val mapped = metrics.addGroup("Mapped")
-    mapped.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(mappedObjectName, "Count").asInstanceOf[Long]
-    })
-    mapped.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long]
-    })
-    mapped.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] 
{
-      override def getValue: Long = con
-        .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long]
-    })
-  }
-
-  private def instantiateThreadMetrics(metrics: MetricGroup): Unit = {
-    val mxBean = ManagementFactory.getThreadMXBean
-
-    metrics.gauge[Int, FlinkGauge[Int]]("Count", new FlinkGauge[Int] {
-      override def getValue: Int = mxBean.getThreadCount
-    })
-  }
-
-  private def instantiateCPUMetrics(metrics: MetricGroup): Unit = {
-    try {
-      val mxBean = ManagementFactory.getOperatingSystemMXBean
-        .asInstanceOf[com.sun.management.OperatingSystemMXBean]
-
-      metrics.gauge[Double, FlinkGauge[Double]]("Load", new FlinkGauge[Double] 
{
-          override def getValue: Double = mxBean.getProcessCpuLoad
-        })
-      metrics.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
-          override def getValue: Long = mxBean.getProcessCpuTime
-        })
-    }
     catch {
-     case t: Throwable =>
-       LOG.warn("Cannot access 
com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
-        " - CPU load metrics will not be available.") 
+      case t: Throwable =>
+        LOG.warn("Cannot access 
com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
+          " - CPU load metrics will not be available.")
+        metricRegistry.register("cpuLoad", new Gauge[Double] {
+          override def getValue: Double = -1.0
+        })
     }
+    metricRegistry
   }
 }

Reply via email to