[FLINK-7867] Start MetricQueryService in TaskManagerRunner This closes #4853.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c79afaa Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c79afaa Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c79afaa Branch: refs/heads/master Commit: 6c79afaa668d22e1192dc655e54cc3502e33bc89 Parents: 9f6f309 Author: Till Rohrmann <[email protected]> Authored: Wed Oct 18 16:54:44 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Wed Nov 1 15:48:00 2017 +0100 ---------------------------------------------------------------------- .../flink/runtime/taskexecutor/TaskManagerRunner.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6c79afaa/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 26671a8..782ab07 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; @@ -47,6 +48,8 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; + +import akka.actor.ActorSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,6 +114,9 @@ public class TaskManagerRunner implements FatalErrorHandler { metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)); + final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem(); + metricRegistry.startQueryService(actorSystem, resourceId); + taskManager = startTaskManager( configuration, resourceId, @@ -144,7 +150,11 @@ public class TaskManagerRunner implements FatalErrorHandler { exception = e; } - metricRegistry.shutdown(); + try { + metricRegistry.shutdown(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } rpcService.stopService();
