[FLINK-2208] [runtime] Fix reflective access to CPU load to support IBM Java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60cfa0b2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60cfa0b2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60cfa0b2 Branch: refs/heads/master Commit: 60cfa0b2dcb4dae1637d867c4c63c67b3d26adf9 Parents: 8acc0d2 Author: Stephan Ewen <[email protected]> Authored: Wed Jul 8 11:58:00 2015 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Jul 8 20:28:40 2015 +0200 ---------------------------------------------------------------------- .../flink/runtime/taskmanager/TaskManager.scala | 47 +++++++++----------- 1 file changed, 21 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/60cfa0b2/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 612d5c0..1c60e89 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -22,7 +22,7 @@ import java.io.{File, IOException} import java.net.{InetAddress, InetSocketAddress} import java.util.concurrent.TimeUnit import java.lang.reflect.Method -import java.lang.management.ManagementFactory +import java.lang.management.{OperatingSystemMXBean, ManagementFactory} import akka.actor._ import akka.pattern.ask @@ -35,7 +35,7 @@ import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet} import com.fasterxml.jackson.databind.ObjectMapper import grizzled.slf4j.Logger -import org.apache.flink.configuration._ +import org.apache.flink.configuration.{Configuration, ConfigConstants, GlobalConfiguration, IllegalConfigurationException} import org.apache.flink.runtime.messages.checkpoint.{ConfirmCheckpoint, TriggerCheckpoint, AbstractCheckpointMessage} import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages} import org.apache.flink.runtime.akka.AkkaUtils @@ -1752,40 +1752,35 @@ object TaskManager { ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage() }) - // Preprocessing steps for registering cpuLoad - val fetchCPULoad = getMethodToFetchCPULoad() - - // Log getProcessCpuLoad unavailable for Java 6 - if(fetchCPULoad.isEmpty){ - LOG.warn("getProcessCpuLoad method not available in the Operating System Bean" + - "implementation for this Java runtime environment\n" + - Thread.currentThread().getStackTrace) - } + // Pre-processing steps for registering cpuLoad + val osBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean() + + val fetchCPULoadMethod: Option[Method] = + try { + Class.forName("com.sun.management.OperatingSystemMXBean") + .getMethods() + .find( _.getName() == "getProcessCpuLoad" ) + } + catch { + case t: Throwable => + LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" + + " - CPU load metrics will not be available.") + None + } metricRegistry.register("cpuLoad", new Gauge[Double] { override def getValue: Double = { try{ - fetchCPULoad.map(_.invoke(ManagementFactory.getOperatingSystemMXBean(). - asInstanceOf[com.sun.management.OperatingSystemMXBean]). - asInstanceOf[Double]).getOrElse(-1) - } catch { + fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0) + } + catch { case t: Throwable => { LOG.warn("Error retrieving CPU Load through OperatingSystemMXBean", t) - -1 + -1.0 } } } }) metricRegistry } - - /** - * Fetches getProcessCpuLoad method if available in the - * OperatingSystemMXBean implementation else returns None - * @return - */ - private def getMethodToFetchCPULoad(): Option[Method] = { - val methodsList = classOf[com.sun.management.OperatingSystemMXBean].getMethods() - methodsList.filter(_.getName == "getProcessCpuLoad").headOption - } }
