tillrohrmann closed pull request #6429: [FLINK-9943] Support
TaskManagerMetricQueryServicePaths msg in JobManager Actor
URL: https://github.com/apache/flink/pull/6429
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 37a27c724ef..f26b415aec7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -47,7 +47,6 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.util.stream.Collectors;
import scala.Option;
import scala.reflect.ClassTag$;
@@ -266,20 +265,15 @@ public String getHostname() {
@Override
public CompletableFuture<Collection<Tuple2<ResourceID, String>>>
requestTaskManagerMetricQueryServicePaths(Time timeout) {
- return requestTaskManagerInstances(timeout)
- .thenApply(
- (Collection<Instance> instances) ->
- instances
- .stream()
- .map(
- (Instance instance) -> {
- final String
taskManagerAddress = instance.getTaskManagerGateway().getAddress();
- final String
taskManagerMetricQuerServicePath = taskManagerAddress.substring(0,
taskManagerAddress.lastIndexOf('/') + 1) +
-
MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' +
instance.getTaskManagerID().getResourceIdString();
-
- return
Tuple2.of(instance.getTaskManagerID(), taskManagerMetricQuerServicePath);
- })
- .collect(Collectors.toList()));
+
CompletableFuture<JobManagerMessages.TaskManagerMetricQueryServicePaths>
taskManagerQueryPathsFuture =
+ FutureUtils.toJava(
+ jobManagerGateway
+
.ask(JobManagerMessages.getRequestTaskManagerMetricQueryServicePaths(),
FutureUtils.toFiniteDuration(timeout))
+
.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.TaskManagerMetricQueryServicePaths.class)));
+
+ return taskManagerQueryPathsFuture.thenApply(
+
JobManagerMessages.TaskManagerMetricQueryServicePaths::asJavaCollection
+ );
}
@Override
diff --git
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 7799a78b089..0d356b065ba 100644
---
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -31,6 +31,7 @@ import akka.pattern.ask
import grizzled.slf4j.Logger
import org.apache.flink.api.common.JobID
import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.configuration._
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.core.io.InputSplitAssigner
@@ -72,6 +73,7 @@ import
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage,
import
org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview.JobIdWithStatus
import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
import org.apache.flink.runtime.messages.{Acknowledge,
FlinkJobNotFoundException, StackTrace}
+import org.apache.flink.runtime.metrics.dump.MetricQueryService
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
import org.apache.flink.runtime.metrics.util.MetricUtils
import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration,
MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry}
@@ -1051,6 +1053,21 @@ class JobManager(
)
)
+ case RequestTaskManagerMetricQueryServicePaths =>
+ val instances = instanceManager.getAllRegisteredInstances
+ val paths = instances.asScala map {
+ instance => {
+ val taskManagerAddress = instance.getTaskManagerGateway.getAddress
+ val queryServicePath =
+ taskManagerAddress.substring(0,
taskManagerAddress.lastIndexOf('/') + 1) +
+ MetricQueryService.METRIC_QUERY_SERVICE_NAME +
+ '_' + instance.getTaskManagerID.getResourceIdString
+ Tuple2.of(instance.getTaskManagerID, queryServicePath)
+ }
+ }
+ sender ! decorateMessage(TaskManagerMetricQueryServicePaths(paths))
+
+
case RequestTaskManagerInstance(resourceId) =>
sender ! decorateMessage(
TaskManagerInstance(Option(instanceManager.getRegisteredInstance(resourceId)))
diff --git
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 5c19c7aa1b5..d1ff45c3a5f 100644
---
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -23,6 +23,7 @@ import java.util.UUID
import akka.actor.ActorRef
import org.apache.flink.api.common.JobID
+import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.runtime.akka.ListeningBehaviour
import org.apache.flink.runtime.blob.PermanentBlobKey
import org.apache.flink.runtime.client.{JobStatusMessage,
SerializedJobExecutionResult}
@@ -430,6 +431,27 @@ object JobManagerMessages {
*/
case class TaskManagerInstance(instance: Option[Instance])
+ /**
+ * Requests the metric query service paths of the registered task manager.
+ */
+ case object RequestTaskManagerMetricQueryServicePaths
+
+ /**
+ * Returns the metric query service paths of the registered task manager.
This is in response to
+ * [[RequestTaskManagerMetricQueryServicePaths]]
+ */
+ case class TaskManagerMetricQueryServicePaths(paths:
Iterable[Tuple2[ResourceID, String]]) {
+ def asJavaIterable: java.lang.Iterable[Tuple2[ResourceID, String]] = {
+ import scala.collection.JavaConverters._
+ paths.asJava
+ }
+
+ def asJavaCollection: java.util.Collection[Tuple2[ResourceID, String]] = {
+ import scala.collection.JavaConverters._
+ paths.asJavaCollection
+ }
+ }
+
/**
* Requests stack trace messages of the task manager
*
@@ -554,6 +576,10 @@ object JobManagerMessages {
RequestRegisteredTaskManagers
}
+ def getRequestTaskManagerMetricQueryServicePaths : AnyRef = {
+ RequestTaskManagerMetricQueryServicePaths
+ }
+
def getRequestJobManagerStatus : AnyRef = {
RequestJobManagerStatus
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services