[FLINK-4733] Port TaskManager metrics
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf4f3644 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf4f3644 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf4f3644 Branch: refs/heads/master Commit: cf4f3644c0eda42c4c872daa175d095117788086 Parents: 1f4f6f9 Author: zentol <[email protected]> Authored: Mon Oct 31 14:13:01 2016 +0100 Committer: zentol <[email protected]> Committed: Mon Oct 31 15:12:03 2016 +0100 ---------------------------------------------------------------------- .../runtime/webmonitor/WebRuntimeMonitor.java | 4 +- .../handlers/TaskManagersHandler.java | 62 +++++++++++++-- .../runtime/webmonitor/metrics/MetricStore.java | 11 +++ .../taskmanager/taskmanager.metrics.jade | 79 ++++++++----------- .../taskmanager/taskmanager.metrics.html | 82 ++++++++------------ flink-runtime/pom.xml | 17 ++-- .../apache/flink/runtime/instance/Instance.java | 10 --- .../flink/runtime/instance/InstanceManager.java | 3 +- .../flink/runtime/jobmanager/JobManager.scala | 4 +- .../runtime/messages/TaskManagerMessages.scala | 7 +- .../flink/runtime/taskmanager/TaskManager.scala | 66 +--------------- .../runtime/instance/InstanceManagerTest.java | 12 +-- 12 files changed, 153 insertions(+), 204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index e907124..a0afba2 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -287,8 +287,8 @@ public class WebRuntimeMonitor implements WebMonitor { .GET("/jobs/:jobid/checkpoints", handler(new JobCheckpointsHandler(currentGraphs))) .GET("/jobs/:jobid/metrics", handler(new JobMetricsHandler(metricFetcher))) - .GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT))) - .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT))) + .GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher))) + .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher))) .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/log", new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, TaskManagerLogHandler.FileMode.LOG, config, enableSSL)) http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java index c20d4fe..42815ae 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java @@ -25,6 +25,8 @@ import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.RegisteredTaskManagers; import org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; import org.apache.flink.util.StringUtils; import scala.concurrent.Await; import scala.concurrent.Future; @@ -42,9 +44,12 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler { public static final String TASK_MANAGER_ID_KEY = "taskmanagerid"; private final FiniteDuration timeout; + + private final MetricFetcher fetcher; - public TaskManagersHandler(FiniteDuration timeout) { + public TaskManagersHandler(FiniteDuration timeout, MetricFetcher fetcher) { this.timeout = requireNonNull(timeout); + this.fetcher = fetcher; } @Override @@ -95,10 +100,57 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler { // only send metrics when only one task manager requests them. if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) { - byte[] report = instance.getLastMetricsReport(); - if (report != null) { - gen.writeFieldName("metrics"); - gen.writeRawValue(new String(report, "utf-8")); + fetcher.update(); + MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString()); + if (metrics != null) { + gen.writeObjectFieldStart("metrics"); + long heapUsed = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Used", "0")); + long heapCommitted = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0")); + long heapTotal = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Max", "0")); + + gen.writeNumberField("heapCommitted", heapCommitted); + gen.writeNumberField("heapUsed", heapUsed); + gen.writeNumberField("heapMax", heapTotal); + + long nonHeapUsed = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0")); + long nonHeapCommitted = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0")); + long nonHeapTotal = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0")); + + gen.writeNumberField("nonHeapCommitted", nonHeapCommitted); + gen.writeNumberField("nonHeapUsed", nonHeapUsed); + gen.writeNumberField("nonHeapMax", nonHeapTotal); + + gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted); + gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed); + gen.writeNumberField("totalMax", heapTotal + nonHeapTotal); + + gen.writeStringField("directCount", metrics.getMetric("Status.JVM.Memory.Direct.Count", "0")); + gen.writeStringField("directUsed", metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0")); + gen.writeStringField("directMax", metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0")); + + gen.writeStringField("mappedCount", metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0")); + gen.writeStringField("mappedUsed", metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0")); + gen.writeStringField("mappedMax", metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0")); + + gen.writeStringField("memorySegmentsAvailable", metrics.getMetric("Status.Network.AvailableMemorySegments", "0")); + gen.writeStringField("memorySegmentsTotal", metrics.getMetric("Status.Network.TotalMemorySegments", "0")); + + gen.writeArrayFieldStart("garbageCollectors"); + + for (String gcName : metrics.garbageCollectorNames) { + String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null); + String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null); + if (count != null && time != null) { + gen.writeStartObject(); + gen.writeStringField("name", gcName); + gen.writeStringField("count", count); + gen.writeStringField("time", time); + gen.writeEndObject(); + } + } + + gen.writeEndArray(); + gen.writeEndObject(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java index 989145b..51b3b4d 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java @@ -23,7 +23,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER; import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE; @@ -77,6 +79,10 @@ public class MetricStore { tm = new TaskManagerMetricStore(); taskManagers.put(tmID, tm); } + if (name.contains("GarbageCollector")) { + String gcName = name.substring("Status.JVM.GarbageCollector.".length(), name.lastIndexOf('.')); + tm.addGarbageCollectorName(gcName); + } addMetric(tm.metrics, name, metric); break; case INFO_CATEGORY_JOB: @@ -260,6 +266,11 @@ public class MetricStore { * Sub-structure containing metrics of a single TaskManager. */ public static class TaskManagerMetricStore extends ComponentMetricStore { + public final Set<String> garbageCollectorNames = new HashSet<>(); + + public void addGarbageCollectorName(String name) { + garbageCollectorNames.add(name); + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.metrics.jade ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.metrics.jade b/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.metrics.jade index c546d74..7920178 100644 --- a/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.metrics.jade +++ b/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.metrics.jade @@ -50,19 +50,19 @@ div(ng-if="metrics.id") tbody tr td Heap - td {{metrics.metrics.gauges['memory.heap.committed'].value | humanizeBytes}} - td {{metrics.metrics.gauges['memory.heap.init'].value | humanizeBytes}} - td {{metrics.metrics.gauges['memory.heap.max'].value | humanizeBytes}} + td {{ metrics.metrics.heapCommitted | humanizeBytes }} + td {{ metrics.metrics.heapUsed | humanizeBytes }} + td {{ metrics.metrics.heapMax | humanizeBytes }} tr td Non-Heap - td {{metrics.metrics.gauges['memory.non-heap.committed'].value | humanizeBytes}} - td {{metrics.metrics.gauges['memory.non-heap.init'].value | humanizeBytes}} - td {{metrics.metrics.gauges['memory.non-heap.max'].value | humanizeBytes}} + td {{ metrics.metrics.nonHeapCommitted | humanizeBytes }} + td {{ metrics.metrics.nonHeapUsed | humanizeBytes }} + td {{ metrics.metrics.nonHeapMax | humanizeBytes }} tr td Total - td {{metrics.metrics.gauges['memory.total.committed'].value | humanizeBytes}} - td {{metrics.metrics.gauges['memory.total.init'].value | humanizeBytes}} - td {{metrics.metrics.gauges['memory.total.max'].value | humanizeBytes}} + td {{ metrics.metrics.totalCommitted | humanizeBytes }} + td {{ metrics.metrics.totalUsed | humanizeBytes }} + td {{ metrics.metrics.totalMax | humanizeBytes }} h2 Outside JVM table.table.table-properties @@ -75,54 +75,41 @@ div(ng-if="metrics.id") tbody tr td Direct - td {{ metrics.metrics.gauges['direct-memory.direct.count'].value }} - td {{ metrics.metrics.gauges['direct-memory.direct.used'].value | humanizeBytes }} - td {{ metrics.metrics.gauges['direct-memory.direct.capacity'].value | humanizeBytes }} + td {{ metrics.metrics.directCount }} + td {{ metrics.metrics.directUsed }} + td {{ metrics.metrics.directTotal }} tr td Mapped - td {{ metrics.metrics.gauges['direct-memory.mapped.count'].value }} - td {{ metrics.metrics.gauges['direct-memory.mapped.used'].value | humanizeBytes }} - td {{ metrics.metrics.gauges['direct-memory.mapped.capacity'].value | humanizeBytes }} + td {{ metrics.metrics.mappedCount }} + td {{ metrics.metrics.mappedUsed }} + td {{ metrics.metrics.mappedMax }} - h1 Garbage Collection + h1 Network + + h2 MemorySegments table.table.table-properties thead tr - th Collector + th Type th Count - th Time tbody tr - td PS-MarkSweep - td(table-property value="metrics.metrics.gauges['gc.PS-MarkSweep.count'].value") - td(table-property value="metrics.metrics.gauges['gc.PS-MarkSweep.time'].value | humanizeDuration") + td Available + td {{ metrics.metrics.memorySegmentsAvailable }} tr - td PS-Scavenge - td(table-property value="metrics.metrics.gauges['gc.PS-Scavenge.count'].value") - td(table-property value="metrics.metrics.gauges['gc.PS-Scavenge.time'].value | humanizeDuration") + td Total + td {{ metrics.metrics.memorySegmentsTotal }} + - h1 Other Memory Pools + h1 Garbage Collection table.table.table-properties thead tr - th Pool - td Relative Usage - tbody - tr - td Code Cache - td(table-property value="metrics.metrics.gauges['memory.pools.Code-Cache.usage'].value | number:2") - tr - td Compressed Class Space - td(table-property value="metrics.metrics.gauges['memory.pools.Compressed-Class-Space.usage'].value | number:2") - tr - td Metaspace - td(table-property value="metrics.metrics.gauges['memory.pools.Metaspace.usage'].value | number:2") - tr - td PS Eden Space - td(table-property value="metrics.metrics.gauges['memory.pools.PS-Eden-Space.usage'].value | number:2") - tr - td PS Old Gen - td(table-property value="metrics.metrics.gauges['memory.pools.PS-Old-Gen.usage'].value | number:2") - tr - td PS Survivor Space - td(table-property value="metrics.metrics.gauges['memory.pools.PS-Survivor-Space.usage'].value | number:2") + th Collector + th Count + th Time + tbody(ng-repeat="g in metrics.metrics.garbageCollectors") + tr + td {{ g.name }} + td {{ g.count }} + td {{ g.time }} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html index e1d6670..43687cd 100644 --- a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html +++ b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html @@ -57,21 +57,21 @@ limitations under the License. <tbody> <tr> <td>Heap</td> - <td>{{metrics.metrics.gauges['memory.heap.committed'].value | humanizeBytes}}</td> - <td>{{metrics.metrics.gauges['memory.heap.init'].value | humanizeBytes}}</td> - <td>{{metrics.metrics.gauges['memory.heap.max'].value | humanizeBytes}}</td> + <td>{{ metrics.metrics.heapCommitted | humanizeBytes }}</td> + <td>{{ metrics.metrics.heapUsed | humanizeBytes }}</td> + <td>{{ metrics.metrics.heapMax | humanizeBytes }}</td> </tr> <tr> <td>Non-Heap</td> - <td>{{metrics.metrics.gauges['memory.non-heap.committed'].value | humanizeBytes}}</td> - <td>{{metrics.metrics.gauges['memory.non-heap.init'].value | humanizeBytes}}</td> - <td>{{metrics.metrics.gauges['memory.non-heap.max'].value | humanizeBytes}}</td> + <td>{{ metrics.metrics.nonHeapCommitted | humanizeBytes }}</td> + <td>{{ metrics.metrics.nonHeapUsed | humanizeBytes }}</td> + <td>{{ metrics.metrics.nonHeapMax | humanizeBytes }}</td> </tr> <tr> <td>Total</td> - <td>{{metrics.metrics.gauges['memory.total.committed'].value | humanizeBytes}}</td> - <td>{{metrics.metrics.gauges['memory.total.init'].value | humanizeBytes}}</td> - <td>{{metrics.metrics.gauges['memory.total.max'].value | humanizeBytes}}</td> + <td>{{ metrics.metrics.totalCommitted | humanizeBytes }}</td> + <td>{{ metrics.metrics.totalUsed | humanizeBytes }}</td> + <td>{{ metrics.metrics.totalMax | humanizeBytes }}</td> </tr> </tbody> </table> @@ -88,72 +88,52 @@ limitations under the License. <tbody> <tr> <td>Direct</td> - <td>{{ metrics.metrics.gauges['direct-memory.direct.count'].value }}</td> - <td>{{ metrics.metrics.gauges['direct-memory.direct.used'].value | humanizeBytes }}</td> - <td>{{ metrics.metrics.gauges['direct-memory.direct.capacity'].value | humanizeBytes }}</td> + <td>{{ metrics.metrics.directCount }}</td> + <td>{{ metrics.metrics.directUsed }}</td> + <td>{{ metrics.metrics.directTotal }}</td> </tr> <tr> <td>Mapped</td> - <td>{{ metrics.metrics.gauges['direct-memory.mapped.count'].value }}</td> - <td>{{ metrics.metrics.gauges['direct-memory.mapped.used'].value | humanizeBytes }}</td> - <td>{{ metrics.metrics.gauges['direct-memory.mapped.capacity'].value | humanizeBytes }}</td> + <td>{{ metrics.metrics.mappedCount }}</td> + <td>{{ metrics.metrics.mappedUsed }}</td> + <td>{{ metrics.metrics.mappedMax }}</td> </tr> </tbody> </table> - <h1>Garbage Collection</h1> + <h1>Network</h1> + <h2>MemorySegments</h2> <table class="table table-properties"> <thead> <tr> - <th>Collector</th> + <th>Type</th> <th>Count</th> - <th>Time</th> </tr> </thead> <tbody> <tr> - <td>PS-MarkSweep</td> - <td table-property="table-property" value="metrics.metrics.gauges['gc.PS-MarkSweep.count'].value"></td> - <td table-property="table-property" value="metrics.metrics.gauges['gc.PS-MarkSweep.time'].value | humanizeDuration"></td> + <td>Available</td> + <td>{{ metrics.metrics.memorySegmentsAvailable }}</td> </tr> <tr> - <td>PS-Scavenge</td> - <td table-property="table-property" value="metrics.metrics.gauges['gc.PS-Scavenge.count'].value"></td> - <td table-property="table-property" value="metrics.metrics.gauges['gc.PS-Scavenge.time'].value | humanizeDuration"></td> + <td>Total</td> + <td>{{ metrics.metrics.memorySegmentsTotal }}</td> </tr> </tbody> </table> - <h1>Other Memory Pools</h1> + <h1>Garbage Collection</h1> <table class="table table-properties"> <thead> <tr> - <th>Pool</th> - <td>Relative Usage</td> + <th>Collector</th> + <th>Count</th> + <th>Time</th> </tr> </thead> - <tbody> - <tr> - <td>Code Cache</td> - <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.Code-Cache.usage'].value | number:2"></td> - </tr> - <tr> - <td>Compressed Class Space</td> - <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.Compressed-Class-Space.usage'].value | number:2"></td> - </tr> - <tr> - <td>Metaspace</td> - <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.Metaspace.usage'].value | number:2"></td> - </tr> - <tr> - <td>PS Eden Space</td> - <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.PS-Eden-Space.usage'].value | number:2"></td> - </tr> - <tr> - <td>PS Old Gen</td> - <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.PS-Old-Gen.usage'].value | number:2"></td> - </tr> - <tr> - <td>PS Survivor Space</td> - <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.PS-Survivor-Space.usage'].value | number:2"></td> + <tbody ng-repeat="g in metrics.metrics.garbageCollectors"> + <tr> + <td>{{ g.name }}</td> + <td>{{ g.count }}</td> + <td>{{ g.time }}</td> </tr> </tbody> </table> http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 5fea8fb..0a294fa 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -123,20 +123,15 @@ under the License. </dependency> <dependency> - <groupId>io.dropwizard.metrics</groupId> - <artifactId>metrics-core</artifactId> - <version>${metrics.version}</version> - </dependency> - <dependency> - <groupId>io.dropwizard.metrics</groupId> - <artifactId>metrics-jvm</artifactId> - <version>${metrics.version}</version> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${jackson.version}</version> </dependency> <dependency> - <groupId>io.dropwizard.metrics</groupId> - <artifactId>metrics-json</artifactId> - <version>${metrics.version}</version> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java index 4a8139b..d63d475 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java @@ -75,8 +75,6 @@ public class Instance implements SlotOwner { /** Time when last heat beat has been received from the task manager running on this taskManager. */ private volatile long lastReceivedHeartBeat = System.currentTimeMillis(); - private byte[] lastMetricsReport; - /** Flag marking the instance as alive or as dead. */ private volatile boolean isDead; @@ -189,14 +187,6 @@ public class Instance implements SlotOwner { this.lastReceivedHeartBeat = System.currentTimeMillis(); } - public void setMetricsReport(byte[] lastMetricsReport) { - this.lastMetricsReport = lastMetricsReport; - } - - public byte[] getLastMetricsReport() { - return lastMetricsReport; - } - /** * Checks whether the last heartbeat occurred within the last {@code n} milliseconds * before the given timestamp {@code now}. http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java index b0e7e57..132ee6f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java @@ -96,7 +96,7 @@ public class InstanceManager { } } - public boolean reportHeartBeat(InstanceID instanceId, byte[] lastMetricsReport) { + public boolean reportHeartBeat(InstanceID instanceId) { if (instanceId == null) { throw new IllegalArgumentException("InstanceID may not be null."); } @@ -118,7 +118,6 @@ public class InstanceManager { } host.reportHeartBeat(); - host.setMetricsReport(lastMetricsReport); if (LOG.isDebugEnabled()) { LOG.debug("Received heartbeat from TaskManager " + host); http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 516bbbe..bcfdd23 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1028,12 +1028,12 @@ class JobManager( TaskManagerInstance(Option(instanceManager.getRegisteredInstanceById(instanceID))) ) - case Heartbeat(instanceID, metricsReport, accumulators) => + case Heartbeat(instanceID, accumulators) => log.debug(s"Received heartbeat message from $instanceID.") updateAccumulators(accumulators) - instanceManager.reportHeartBeat(instanceID, metricsReport) + instanceManager.reportHeartBeat(instanceID) case message: AccumulatorMessage => handleAccumulatorMessage(message) http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala index b433015..a493b3d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala @@ -52,15 +52,12 @@ object TaskManagerMessages { /** * Reports liveliness of the TaskManager instance with the given instance ID to the - * This message is sent to the job. This message reports the TaskManagers - * metrics, as a byte array. + * This message is sent to the job. * * @param instanceID The instance ID of the reporting TaskManager. - * @param metricsReport utf-8 encoded JSON metrics report from the metricRegistry. * @param accumulators Accumulators of tasks serialized as Tuple2[internal, user-defined] */ - case class Heartbeat(instanceID: InstanceID, metricsReport: Array[Byte], - accumulators: Seq[AccumulatorSnapshot]) + case class Heartbeat(instanceID: InstanceID, accumulators: Seq[AccumulatorSnapshot]) // -------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 9727860..7608b87 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -19,7 +19,7 @@ package org.apache.flink.runtime.taskmanager import java.io.{File, FileInputStream, IOException} -import java.lang.management.{ManagementFactory, OperatingSystemMXBean} +import java.lang.management.ManagementFactory import java.net.{InetAddress, InetSocketAddress} import java.util import java.util.UUID @@ -28,10 +28,6 @@ import java.util.concurrent.TimeUnit import _root_.akka.actor._ import _root_.akka.pattern.ask import _root_.akka.util.Timeout -import com.codahale.metrics.json.MetricsModule -import com.codahale.metrics.jvm.{BufferPoolMetricSet, GarbageCollectorMetricSet, MemoryUsageGaugeSet} -import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry} -import com.fasterxml.jackson.databind.ObjectMapper import grizzled.slf4j.Logger import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.flink.configuration._ @@ -155,20 +151,8 @@ class TaskManager( /** Handler for distributed files cached by this TaskManager */ protected val fileCache = new FileCache(config.configuration) - /** Registry of metrics periodically transmitted to the JobManager */ - private val metricRegistry = TaskManager.createMetricsRegistry() - private var taskManagerMetricGroup : TaskManagerMetricGroup = _ - /** Metric serialization */ - private val metricRegistryMapper: ObjectMapper = new ObjectMapper() - .registerModule( - new MetricsModule( - TimeUnit.SECONDS, - TimeUnit.MILLISECONDS, - false, - MetricFilter.ALL)) - /** Actors which want to be notified once this task manager has been * registered at the job manager */ private val waitForRegistration = scala.collection.mutable.Set[ActorRef]() @@ -1332,7 +1316,6 @@ class TaskManager( protected def sendHeartbeatToJobManager(): Unit = { try { log.debug("Sending heartbeat to JobManager") - val metricsReport: Array[Byte] = metricRegistryMapper.writeValueAsBytes(metricRegistry) val accumulatorEvents = scala.collection.mutable.Buffer[AccumulatorSnapshot]() @@ -1351,7 +1334,7 @@ class TaskManager( } currentJobManager foreach { - jm => jm ! decorateMessage(Heartbeat(instanceID, metricsReport, accumulatorEvents)) + jm => jm ! decorateMessage(Heartbeat(instanceID, accumulatorEvents)) } } catch { @@ -2481,49 +2464,4 @@ object TaskManager { case (_, id) => throw new IllegalArgumentException(s"Temporary file directory #$id is null.") } } - - /** - * Creates the registry of default metrics, including stats about garbage collection, memory - * usage, and system CPU load. - * - * @return The registry with the default metrics. - */ - private def createMetricsRegistry() : MetricRegistry = { - val metricRegistry = new MetricRegistry() - - // register default metrics - metricRegistry.register("gc", new GarbageCollectorMetricSet) - metricRegistry.register("memory", new MemoryUsageGaugeSet) - metricRegistry.register("direct-memory", new BufferPoolMetricSet( - ManagementFactory.getPlatformMBeanServer)) - metricRegistry.register("load", new Gauge[Double] { - override def getValue: Double = - ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage() - }) - - val osBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean() - - try { - val fetchCPULoadMethod = Class.forName("com.sun.management.OperatingSystemMXBean") - .getMethods() - .find( _.getName() == "getProcessCpuLoad" ) - - // verify that we can invoke the method - fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0) - - metricRegistry.register("cpuLoad", new Gauge[Double] { - override def getValue: Double = fetchCPULoadMethod - .map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0) - }) - } - catch { - case t: Throwable => - LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" + - " - CPU load metrics will not be available.") - metricRegistry.register("cpuLoad", new Gauge[Double] { - override def getValue: Double = -1.0 - }) - } - metricRegistry - } } http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java index f3747c8..2b10e09 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java @@ -186,12 +186,12 @@ public class InstanceManagerTest{ probe3.getRef(), ici3, hardwareDescription, 1, leaderSessionID); // report some immediate heart beats - assertTrue(cm.reportHeartBeat(instanceID1, new byte[] {})); - assertTrue(cm.reportHeartBeat(instanceID2, new byte[] {})); - assertTrue(cm.reportHeartBeat(instanceID3, new byte[] {})); + assertTrue(cm.reportHeartBeat(instanceID1)); + assertTrue(cm.reportHeartBeat(instanceID2)); + assertTrue(cm.reportHeartBeat(instanceID3)); // report heart beat for non-existing instance - assertFalse(cm.reportHeartBeat(new InstanceID(), new byte[] {})); + assertFalse(cm.reportHeartBeat(new InstanceID())); final long WAIT = 200; CommonTestUtils.sleepUninterruptibly(WAIT); @@ -205,7 +205,7 @@ public class InstanceManagerTest{ long h3 = it.next().getLastHeartBeat(); // send one heart beat again and verify that the - assertTrue(cm.reportHeartBeat(instance1.getId(), new byte[] {})); + assertTrue(cm.reportHeartBeat(instance1.getId())); long newH1 = instance1.getLastHeartBeat(); long now = System.currentTimeMillis(); @@ -244,7 +244,7 @@ public class InstanceManagerTest{ // expected } - assertFalse(cm.reportHeartBeat(new InstanceID(), new byte[] {})); + assertFalse(cm.reportHeartBeat(new InstanceID())); } catch (Exception e) { System.err.println(e.getMessage());
