Repository: flink Updated Branches: refs/heads/master 5c83e787c -> 3ab97ae3d
[FLINK-4784] Unique MetricQueryService actor names This closes #2636. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ab97ae3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ab97ae3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ab97ae3 Branch: refs/heads/master Commit: 3ab97ae3d57c6b3c70c2c1103565621ccefe17f0 Parents: 5c83e78 Author: zentol <[email protected]> Authored: Mon Oct 10 10:40:10 2016 +0200 Committer: zentol <[email protected]> Committed: Fri Oct 21 12:03:59 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/webmonitor/metrics/MetricFetcher.java | 4 ++-- .../flink/runtime/webmonitor/metrics/MetricFetcherTest.java | 5 ++++- .../org/apache/flink/runtime/metrics/MetricRegistry.java | 6 ++++-- .../flink/runtime/metrics/dump/MetricQueryService.java | 9 +++++++-- .../org/apache/flink/runtime/jobmanager/JobManager.scala | 2 +- .../flink/runtime/minicluster/LocalFlinkMiniCluster.scala | 2 +- .../org/apache/flink/runtime/taskmanager/TaskManager.scala | 2 +- .../flink/runtime/metrics/dump/MetricQueryServiceTest.java | 2 +- 8 files changed, 21 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3ab97ae3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java index 7a39a53..3b10587 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java @@ -126,7 +126,7 @@ public class MetricFetcher { logErrorOnFailure(jobDetailsFuture, "Fetching of JobDetails failed."); String jobManagerPath = jobManager.path(); - String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + "MetricQueryService"; + String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME; ActorRef jobManagerQueryService = actorSystem.actorFor(queryServicePath); queryMetrics(jobManagerQueryService); @@ -148,7 +148,7 @@ public class MetricFetcher { activeTaskManagers.add(taskManager.getId().toString()); String taskManagerPath = taskManager.getActorGateway().path(); - String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + "MetricQueryService"; + String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManager.getTaskManagerID().getResourceIdString(); ActorRef taskManagerQueryService = actorSystem.actorFor(queryServicePath); queryMetrics(taskManagerQueryService); http://git-wip-us.apache.org/repos/asf/flink/blob/3ab97ae3/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java index e0cfe26..356ce67 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java @@ -26,6 +26,7 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceID; @@ -73,12 +74,14 @@ public class MetricFetcherTest extends TestLogger { // ========= setup TaskManager ================================================================================= JobID jobID = new JobID(); InstanceID tmID = new InstanceID(); + ResourceID tmRID = new ResourceID(tmID.toString()); ActorGateway taskManagerGateway = mock(ActorGateway.class); when(taskManagerGateway.path()).thenReturn("/tm/address"); Instance taskManager = mock(Instance.class); when(taskManager.getActorGateway()).thenReturn(taskManagerGateway); when(taskManager.getId()).thenReturn(tmID); + when(taskManager.getTaskManagerID()).thenReturn(tmRID); // ========= setup JobManager ================================================================================== JobDetails details = mock(JobDetails.class); @@ -106,7 +109,7 @@ public class MetricFetcherTest extends TestLogger { ActorSystem actorSystem = mock(ActorSystem.class); when(actorSystem.actorFor(eq("/jm/" + METRIC_QUERY_SERVICE_NAME))).thenReturn(jmQueryService); - when(actorSystem.actorFor(eq("/tm/" + METRIC_QUERY_SERVICE_NAME))).thenReturn(tmQueryService); + when(actorSystem.actorFor(eq("/tm/" + METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString()))).thenReturn(tmQueryService); MetricFetcher.BasicGateway jmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class); when(jmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class))) http://git-wip-us.apache.org/repos/asf/flink/blob/3ab97ae3/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java index e68339b..219927d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java @@ -28,6 +28,7 @@ import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; @@ -143,10 +144,11 @@ public class MetricRegistry { * Initializes the MetricQueryService. * * @param actorSystem ActorSystem to create the MetricQueryService on + * @param resourceID resource ID used to disambiguate the actor name */ - public void startQueryService(ActorSystem actorSystem) { + public void startQueryService(ActorSystem actorSystem, ResourceID resourceID) { try { - queryService = MetricQueryService.startMetricQueryService(actorSystem); + queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID); } catch (Exception e) { LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/3ab97ae3/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java index 6e0b443..20bc258 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java @@ -29,6 +29,7 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.Metric; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -159,10 +160,14 @@ public class MetricQueryService extends UntypedActor { * Starts the MetricQueryService actor in the given actor system. * * @param actorSystem The actor system running the MetricQueryService + * @param resourceID resource ID to disambiguate the actor name * @return actor reference to the MetricQueryService */ - public static ActorRef startMetricQueryService(ActorSystem actorSystem) { - return actorSystem.actorOf(Props.create(MetricQueryService.class), METRIC_QUERY_SERVICE_NAME); + public static ActorRef startMetricQueryService(ActorSystem actorSystem, ResourceID resourceID) { + String actorName = resourceID == null + ? METRIC_QUERY_SERVICE_NAME + : METRIC_QUERY_SERVICE_NAME + "_" + resourceID.getResourceIdString(); + return actorSystem.actorOf(Props.create(MetricQueryService.class), actorName); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/3ab97ae3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 18ded6f..1539b8f 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -2698,7 +2698,7 @@ object JobManager { metricsRegistry match { case Some(registry) => - registry.startQueryService(actorSystem) + registry.startQueryService(actorSystem, null) case None => } http://git-wip-us.apache.org/repos/asf/flink/blob/3ab97ae3/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 2f453a3..cad2648 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -219,7 +219,7 @@ class LocalFlinkMiniCluster( leaderRetrievalService, metricsRegistry) - metricsRegistry.startQueryService(system) + metricsRegistry.startQueryService(system, resourceID) system.actorOf(props, taskManagerActorName) } http://git-wip-us.apache.org/repos/asf/flink/blob/3ab97ae3/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 1017ea0..1df6f92 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1876,7 +1876,7 @@ object TaskManager { leaderRetrievalService, metricsRegistry) - metricsRegistry.startQueryService(actorSystem) + metricsRegistry.startQueryService(actorSystem, resourceID) taskManagerActorName match { case Some(actorName) => actorSystem.actorOf(tmProps, actorName) http://git-wip-us.apache.org/repos/asf/flink/blob/3ab97ae3/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java index e5ace68..0104e3e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java @@ -44,7 +44,7 @@ public class MetricQueryServiceTest extends TestLogger { public void testCreateDump() throws Exception { ActorSystem s = AkkaUtils.createLocalActorSystem(new Configuration()); - ActorRef serviceActor = MetricQueryService.startMetricQueryService(s); + ActorRef serviceActor = MetricQueryService.startMetricQueryService(s, null); TestActorRef testActorRef = TestActorRef.create(s, Props.create(TestActor.class)); TestActor testActor = (TestActor) testActorRef.underlyingActor();
