[FLINK-4452] [metrics] TaskManager network buffer gauges Adds gauges for the number of total and available TaskManager network memory segments.
This closes #2408 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28743cfb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28743cfb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28743cfb Branch: refs/heads/flip-6 Commit: 28743cfb86545cf9eaf4ec2cf37ec460a13f3537 Parents: 58165d6 Author: Greg Hogan <c...@greghogan.com> Authored: Tue Aug 23 10:46:48 2016 -0400 Committer: Greg Hogan <c...@greghogan.com> Committed: Wed Aug 24 09:02:15 2016 -0400 ---------------------------------------------------------------------- docs/monitoring/metrics.md | 9 +++++++ .../flink/runtime/taskmanager/TaskManager.scala | 25 +++++++++++++++++--- 2 files changed, 31 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/28743cfb/docs/monitoring/metrics.md ---------------------------------------------------------------------- diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index 023bef9..3a148e1 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -335,6 +335,15 @@ Flink exposes the following system metrics: <td></td> </tr> <tr> + <th rowspan="2"><strong>TaskManager.Status</strong></th> + <td>Network.AvailableMemorySegments</td> + <td>The number of unused memory segments.</td> + </tr> + <tr> + <td>Network.TotalMemorySegments</td> + <td>The number of allocated memory segments.</td> + </tr> + <tr> <th rowspan="19"><strong>TaskManager.Status.JVM</strong></th> <td>ClassLoader.ClassesLoaded</td> <td>The total number of classes loaded since the start of the JVM.</td> http://git-wip-us.apache.org/repos/asf/flink/blob/28743cfb/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 5a95143..72ec2ac 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -965,7 +965,7 @@ class TaskManager( taskManagerMetricGroup = new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString) - TaskManager.instantiateStatusMetrics(taskManagerMetricGroup) + TaskManager.instantiateStatusMetrics(taskManagerMetricGroup, network) // watch job manager to detect when it dies context.watch(jobManager) @@ -2357,9 +2357,16 @@ object TaskManager { metricRegistry } - private def instantiateStatusMetrics(taskManagerMetricGroup: MetricGroup) : Unit = { - val jvm = taskManagerMetricGroup + private def instantiateStatusMetrics( + taskManagerMetricGroup: MetricGroup, + network: NetworkEnvironment) + : Unit = { + val status = taskManagerMetricGroup .addGroup("Status") + + instantiateNetworkMetrics(status.addGroup("Network"), network) + + val jvm = status .addGroup("JVM") instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader")) @@ -2369,6 +2376,18 @@ object TaskManager { instantiateCPUMetrics(jvm.addGroup("CPU")) } + private def instantiateNetworkMetrics( + metrics: MetricGroup, + network: NetworkEnvironment) + : Unit = { + metrics.gauge[Long, FlinkGauge[Long]]("TotalMemorySegments", new FlinkGauge[Long] { + override def getValue: Long = network.getNetworkBufferPool.getTotalNumberOfMemorySegments + }) + metrics.gauge[Long, FlinkGauge[Long]]("AvailableMemorySegments", new FlinkGauge[Long] { + override def getValue: Long = network.getNetworkBufferPool.getNumberOfAvailableMemorySegments + }) + } + private def instantiateClassLoaderMetrics(metrics: MetricGroup) { val mxBean = ManagementFactory.getClassLoadingMXBean