tillrohrmann closed pull request #6429: [FLINK-9943] Support 
TaskManagerMetricQueryServicePaths msg in JobManager Actor
URL: https://github.com/apache/flink/pull/6429
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 37a27c724ef..f26b415aec7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -47,7 +47,6 @@
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.stream.Collectors;
 
 import scala.Option;
 import scala.reflect.ClassTag$;
@@ -266,20 +265,15 @@ public String getHostname() {
 
        @Override
        public CompletableFuture<Collection<Tuple2<ResourceID, String>>> 
requestTaskManagerMetricQueryServicePaths(Time timeout) {
-               return requestTaskManagerInstances(timeout)
-                       .thenApply(
-                               (Collection<Instance> instances) ->
-                                       instances
-                                               .stream()
-                                               .map(
-                                                       (Instance instance) -> {
-                                                               final String 
taskManagerAddress = instance.getTaskManagerGateway().getAddress();
-                                                               final String 
taskManagerMetricQuerServicePath = taskManagerAddress.substring(0, 
taskManagerAddress.lastIndexOf('/') + 1) +
-                                                                       
MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + 
instance.getTaskManagerID().getResourceIdString();
-
-                                                               return 
Tuple2.of(instance.getTaskManagerID(), taskManagerMetricQuerServicePath);
-                                                       })
-                                               .collect(Collectors.toList()));
+               
CompletableFuture<JobManagerMessages.TaskManagerMetricQueryServicePaths> 
taskManagerQueryPathsFuture =
+                               FutureUtils.toJava(
+                                               jobManagerGateway
+                                                               
.ask(JobManagerMessages.getRequestTaskManagerMetricQueryServicePaths(), 
FutureUtils.toFiniteDuration(timeout))
+                                                               
.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.TaskManagerMetricQueryServicePaths.class)));
+
+               return taskManagerQueryPathsFuture.thenApply(
+                               
JobManagerMessages.TaskManagerMetricQueryServicePaths::asJavaCollection
+               );
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 7799a78b089..0d356b065ba 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -31,6 +31,7 @@ import akka.pattern.ask
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.java.tuple.Tuple2
 import org.apache.flink.configuration._
 import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.core.io.InputSplitAssigner
@@ -72,6 +73,7 @@ import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage,
 import 
org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview.JobIdWithStatus
 import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
 import org.apache.flink.runtime.messages.{Acknowledge, 
FlinkJobNotFoundException, StackTrace}
+import org.apache.flink.runtime.metrics.dump.MetricQueryService
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.metrics.util.MetricUtils
 import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, 
MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry}
@@ -1051,6 +1053,21 @@ class JobManager(
         )
       )
 
+    case RequestTaskManagerMetricQueryServicePaths =>
+      val instances = instanceManager.getAllRegisteredInstances
+      val paths = instances.asScala map {
+        instance => {
+          val taskManagerAddress = instance.getTaskManagerGateway.getAddress
+          val queryServicePath =
+            taskManagerAddress.substring(0, 
taskManagerAddress.lastIndexOf('/') + 1) +
+            MetricQueryService.METRIC_QUERY_SERVICE_NAME +
+            '_' + instance.getTaskManagerID.getResourceIdString
+          Tuple2.of(instance.getTaskManagerID, queryServicePath)
+        }
+      }
+      sender ! decorateMessage(TaskManagerMetricQueryServicePaths(paths))
+
+
     case RequestTaskManagerInstance(resourceId) =>
       sender ! decorateMessage(
         
TaskManagerInstance(Option(instanceManager.getRegisteredInstance(resourceId)))
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 5c19c7aa1b5..d1ff45c3a5f 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -23,6 +23,7 @@ import java.util.UUID
 
 import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
+import org.apache.flink.api.java.tuple.Tuple2
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.blob.PermanentBlobKey
 import org.apache.flink.runtime.client.{JobStatusMessage, 
SerializedJobExecutionResult}
@@ -430,6 +431,27 @@ object JobManagerMessages {
    */
   case class TaskManagerInstance(instance: Option[Instance])
 
+  /**
+    * Requests the metric query service paths of the registered task manager.
+    */
+  case object RequestTaskManagerMetricQueryServicePaths
+
+  /**
+    * Returns the metric query service paths of the registered task manager. 
This is in response to
+    * [[RequestTaskManagerMetricQueryServicePaths]]
+    */
+  case class TaskManagerMetricQueryServicePaths(paths: 
Iterable[Tuple2[ResourceID, String]]) {
+    def asJavaIterable: java.lang.Iterable[Tuple2[ResourceID, String]] = {
+      import scala.collection.JavaConverters._
+      paths.asJava
+    }
+
+    def asJavaCollection: java.util.Collection[Tuple2[ResourceID, String]] = {
+      import scala.collection.JavaConverters._
+      paths.asJavaCollection
+    }
+  }
+
   /**
    * Requests stack trace messages of the task manager
    *
@@ -554,6 +576,10 @@ object JobManagerMessages {
     RequestRegisteredTaskManagers
   }
 
+  def getRequestTaskManagerMetricQueryServicePaths : AnyRef = {
+    RequestTaskManagerMetricQueryServicePaths
+  }
+
   def getRequestJobManagerStatus : AnyRef = {
     RequestJobManagerStatus
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to