[FLINK-7867] Start MetricQueryService in TaskManagerRunner

This closes #4853.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c79afaa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c79afaa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c79afaa

Branch: refs/heads/master
Commit: 6c79afaa668d22e1192dc655e54cc3502e33bc89
Parents: 9f6f309
Author: Till Rohrmann <[email protected]>
Authored: Wed Oct 18 16:54:44 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Wed Nov 1 15:48:00 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/taskexecutor/TaskManagerRunner.java   | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c79afaa/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 26671a8..782ab07 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -47,6 +48,8 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -111,6 +114,9 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
 
                metricRegistry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
 
+               final ActorSystem actorSystem = ((AkkaRpcService) 
rpcService).getActorSystem();
+               metricRegistry.startQueryService(actorSystem, resourceId);
+
                taskManager = startTaskManager(
                        configuration,
                        resourceId,
@@ -144,7 +150,11 @@ public class TaskManagerRunner implements 
FatalErrorHandler {
                                exception = e;
                        }
 
-                       metricRegistry.shutdown();
+                       try {
+                               metricRegistry.shutdown();
+                       } catch (Exception e) {
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+                       }
 
                        rpcService.stopService();
 

Reply via email to