Repository: flink
Updated Branches:
  refs/heads/master 5c83e787c -> 3ab97ae3d


[FLINK-4784] Unique MetricQueryService actor names

This closes #2636.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ab97ae3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ab97ae3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ab97ae3

Branch: refs/heads/master
Commit: 3ab97ae3d57c6b3c70c2c1103565621ccefe17f0
Parents: 5c83e78
Author: zentol <[email protected]>
Authored: Mon Oct 10 10:40:10 2016 +0200
Committer: zentol <[email protected]>
Committed: Fri Oct 21 12:03:59 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/webmonitor/metrics/MetricFetcher.java     | 4 ++--
 .../flink/runtime/webmonitor/metrics/MetricFetcherTest.java | 5 ++++-
 .../org/apache/flink/runtime/metrics/MetricRegistry.java    | 6 ++++--
 .../flink/runtime/metrics/dump/MetricQueryService.java      | 9 +++++++--
 .../org/apache/flink/runtime/jobmanager/JobManager.scala    | 2 +-
 .../flink/runtime/minicluster/LocalFlinkMiniCluster.scala   | 2 +-
 .../org/apache/flink/runtime/taskmanager/TaskManager.scala  | 2 +-
 .../flink/runtime/metrics/dump/MetricQueryServiceTest.java  | 2 +-
 8 files changed, 21 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3ab97ae3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
index 7a39a53..3b10587 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
@@ -126,7 +126,7 @@ public class MetricFetcher {
                                logErrorOnFailure(jobDetailsFuture, "Fetching 
of JobDetails failed.");
 
                                String jobManagerPath = jobManager.path();
-                               String queryServicePath = 
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + 
"MetricQueryService";
+                               String queryServicePath = 
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME;
                                ActorRef jobManagerQueryService = 
actorSystem.actorFor(queryServicePath);
 
                                queryMetrics(jobManagerQueryService);
@@ -148,7 +148,7 @@ public class MetricFetcher {
                                                                
activeTaskManagers.add(taskManager.getId().toString());
 
                                                                String 
taskManagerPath = taskManager.getActorGateway().path();
-                                                               String 
queryServicePath = taskManagerPath.substring(0, 
taskManagerPath.lastIndexOf('/') + 1) + "MetricQueryService";
+                                                               String 
queryServicePath = taskManagerPath.substring(0, 
taskManagerPath.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + 
taskManager.getTaskManagerID().getResourceIdString();
                                                                ActorRef 
taskManagerQueryService = actorSystem.actorFor(queryServicePath);
 
                                                                
queryMetrics(taskManagerQueryService);

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab97ae3/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
index e0cfe26..356ce67 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -73,12 +74,14 @@ public class MetricFetcherTest extends TestLogger {
                // ========= setup TaskManager 
=================================================================================
                JobID jobID = new JobID();
                InstanceID tmID = new InstanceID();
+               ResourceID tmRID = new ResourceID(tmID.toString());
                ActorGateway taskManagerGateway = mock(ActorGateway.class);
                when(taskManagerGateway.path()).thenReturn("/tm/address");
 
                Instance taskManager = mock(Instance.class);
                
when(taskManager.getActorGateway()).thenReturn(taskManagerGateway);
                when(taskManager.getId()).thenReturn(tmID);
+               when(taskManager.getTaskManagerID()).thenReturn(tmRID);
 
                // ========= setup JobManager 
==================================================================================
                JobDetails details = mock(JobDetails.class);
@@ -106,7 +109,7 @@ public class MetricFetcherTest extends TestLogger {
 
                ActorSystem actorSystem = mock(ActorSystem.class);
                when(actorSystem.actorFor(eq("/jm/" + 
METRIC_QUERY_SERVICE_NAME))).thenReturn(jmQueryService);
-               when(actorSystem.actorFor(eq("/tm/" + 
METRIC_QUERY_SERVICE_NAME))).thenReturn(tmQueryService);
+               when(actorSystem.actorFor(eq("/tm/" + METRIC_QUERY_SERVICE_NAME 
+ "_" + tmRID.getResourceIdString()))).thenReturn(tmQueryService);
 
                MetricFetcher.BasicGateway jmQueryServiceGateway = 
mock(MetricFetcher.BasicGateway.class);
                
when(jmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()),
 any(FiniteDuration.class)))

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab97ae3/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index e68339b..219927d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -28,6 +28,7 @@ import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
@@ -143,10 +144,11 @@ public class MetricRegistry {
         * Initializes the MetricQueryService.
         * 
         * @param actorSystem ActorSystem to create the MetricQueryService on
+        * @param resourceID resource ID used to disambiguate the actor name
      */
-       public void startQueryService(ActorSystem actorSystem) {
+       public void startQueryService(ActorSystem actorSystem, ResourceID 
resourceID) {
                try {
-                       queryService = 
MetricQueryService.startMetricQueryService(actorSystem);
+                       queryService = 
MetricQueryService.startMetricQueryService(actorSystem, resourceID);
                } catch (Exception e) {
                        LOG.warn("Could not start MetricDumpActor. No metrics 
will be submitted to the WebInterface.", e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab97ae3/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
index 6e0b443..20bc258 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -29,6 +29,7 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -159,10 +160,14 @@ public class MetricQueryService extends UntypedActor {
         * Starts the MetricQueryService actor in the given actor system.
         *
         * @param actorSystem The actor system running the MetricQueryService
+        * @param resourceID resource ID to disambiguate the actor name
         * @return actor reference to the MetricQueryService
         */
-       public static ActorRef startMetricQueryService(ActorSystem actorSystem) 
{
-               return 
actorSystem.actorOf(Props.create(MetricQueryService.class), 
METRIC_QUERY_SERVICE_NAME);
+       public static ActorRef startMetricQueryService(ActorSystem actorSystem, 
ResourceID resourceID) {
+               String actorName = resourceID == null
+                       ? METRIC_QUERY_SERVICE_NAME
+                       : METRIC_QUERY_SERVICE_NAME + "_" + 
resourceID.getResourceIdString();
+               return 
actorSystem.actorOf(Props.create(MetricQueryService.class), actorName);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab97ae3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 18ded6f..1539b8f 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2698,7 +2698,7 @@ object JobManager {
     
     metricsRegistry match {
       case Some(registry) =>
-        registry.startQueryService(actorSystem)
+        registry.startQueryService(actorSystem, null)
       case None =>
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab97ae3/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 2f453a3..cad2648 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -219,7 +219,7 @@ class LocalFlinkMiniCluster(
       leaderRetrievalService,
       metricsRegistry)
 
-    metricsRegistry.startQueryService(system)
+    metricsRegistry.startQueryService(system, resourceID)
 
     system.actorOf(props, taskManagerActorName)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab97ae3/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 1017ea0..1df6f92 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1876,7 +1876,7 @@ object TaskManager {
       leaderRetrievalService,
       metricsRegistry)
 
-    metricsRegistry.startQueryService(actorSystem)
+    metricsRegistry.startQueryService(actorSystem, resourceID)
 
     taskManagerActorName match {
       case Some(actorName) => actorSystem.actorOf(tmProps, actorName)

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab97ae3/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index e5ace68..0104e3e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -44,7 +44,7 @@ public class MetricQueryServiceTest extends TestLogger {
        public void testCreateDump() throws Exception {
 
                ActorSystem s = AkkaUtils.createLocalActorSystem(new 
Configuration());
-               ActorRef serviceActor = 
MetricQueryService.startMetricQueryService(s);
+               ActorRef serviceActor = 
MetricQueryService.startMetricQueryService(s, null);
                TestActorRef testActorRef = TestActorRef.create(s, 
Props.create(TestActor.class));
                TestActor testActor = (TestActor) 
testActorRef.underlyingActor();
 

Reply via email to