[
https://issues.apache.org/jira/browse/FLINK-9943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647742#comment-16647742
]
ASF GitHub Bot commented on FLINK-9943:
---------------------------------------
tillrohrmann closed pull request #6429: [FLINK-9943] Support
TaskManagerMetricQueryServicePaths msg in JobManager Actor
URL: https://github.com/apache/flink/pull/6429
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 37a27c724ef..f26b415aec7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -47,7 +47,6 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.util.stream.Collectors;
import scala.Option;
import scala.reflect.ClassTag$;
@@ -266,20 +265,15 @@ public String getHostname() {
@Override
public CompletableFuture<Collection<Tuple2<ResourceID, String>>>
requestTaskManagerMetricQueryServicePaths(Time timeout) {
- return requestTaskManagerInstances(timeout)
- .thenApply(
- (Collection<Instance> instances) ->
- instances
- .stream()
- .map(
- (Instance instance) -> {
- final String
taskManagerAddress = instance.getTaskManagerGateway().getAddress();
- final String
taskManagerMetricQuerServicePath = taskManagerAddress.substring(0,
taskManagerAddress.lastIndexOf('/') + 1) +
-
MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' +
instance.getTaskManagerID().getResourceIdString();
-
- return
Tuple2.of(instance.getTaskManagerID(), taskManagerMetricQuerServicePath);
- })
- .collect(Collectors.toList()));
+
CompletableFuture<JobManagerMessages.TaskManagerMetricQueryServicePaths>
taskManagerQueryPathsFuture =
+ FutureUtils.toJava(
+ jobManagerGateway
+
.ask(JobManagerMessages.getRequestTaskManagerMetricQueryServicePaths(),
FutureUtils.toFiniteDuration(timeout))
+
.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.TaskManagerMetricQueryServicePaths.class)));
+
+ return taskManagerQueryPathsFuture.thenApply(
+
JobManagerMessages.TaskManagerMetricQueryServicePaths::asJavaCollection
+ );
}
@Override
diff --git
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 7799a78b089..0d356b065ba 100644
---
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -31,6 +31,7 @@ import akka.pattern.ask
import grizzled.slf4j.Logger
import org.apache.flink.api.common.JobID
import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.configuration._
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.core.io.InputSplitAssigner
@@ -72,6 +73,7 @@ import
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage,
import
org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview.JobIdWithStatus
import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
import org.apache.flink.runtime.messages.{Acknowledge,
FlinkJobNotFoundException, StackTrace}
+import org.apache.flink.runtime.metrics.dump.MetricQueryService
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
import org.apache.flink.runtime.metrics.util.MetricUtils
import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration,
MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry}
@@ -1051,6 +1053,21 @@ class JobManager(
)
)
+ case RequestTaskManagerMetricQueryServicePaths =>
+ val instances = instanceManager.getAllRegisteredInstances
+ val paths = instances.asScala map {
+ instance => {
+ val taskManagerAddress = instance.getTaskManagerGateway.getAddress
+ val queryServicePath =
+ taskManagerAddress.substring(0,
taskManagerAddress.lastIndexOf('/') + 1) +
+ MetricQueryService.METRIC_QUERY_SERVICE_NAME +
+ '_' + instance.getTaskManagerID.getResourceIdString
+ Tuple2.of(instance.getTaskManagerID, queryServicePath)
+ }
+ }
+ sender ! decorateMessage(TaskManagerMetricQueryServicePaths(paths))
+
+
case RequestTaskManagerInstance(resourceId) =>
sender ! decorateMessage(
TaskManagerInstance(Option(instanceManager.getRegisteredInstance(resourceId)))
diff --git
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 5c19c7aa1b5..d1ff45c3a5f 100644
---
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -23,6 +23,7 @@ import java.util.UUID
import akka.actor.ActorRef
import org.apache.flink.api.common.JobID
+import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.runtime.akka.ListeningBehaviour
import org.apache.flink.runtime.blob.PermanentBlobKey
import org.apache.flink.runtime.client.{JobStatusMessage,
SerializedJobExecutionResult}
@@ -430,6 +431,27 @@ object JobManagerMessages {
*/
case class TaskManagerInstance(instance: Option[Instance])
+ /**
+ * Requests the metric query service paths of the registered task manager.
+ */
+ case object RequestTaskManagerMetricQueryServicePaths
+
+ /**
+ * Returns the metric query service paths of the registered task manager.
This is in response to
+ * [[RequestTaskManagerMetricQueryServicePaths]]
+ */
+ case class TaskManagerMetricQueryServicePaths(paths:
Iterable[Tuple2[ResourceID, String]]) {
+ def asJavaIterable: java.lang.Iterable[Tuple2[ResourceID, String]] = {
+ import scala.collection.JavaConverters._
+ paths.asJava
+ }
+
+ def asJavaCollection: java.util.Collection[Tuple2[ResourceID, String]] = {
+ import scala.collection.JavaConverters._
+ paths.asJavaCollection
+ }
+ }
+
/**
* Requests stack trace messages of the task manager
*
@@ -554,6 +576,10 @@ object JobManagerMessages {
RequestRegisteredTaskManagers
}
+ def getRequestTaskManagerMetricQueryServicePaths : AnyRef = {
+ RequestTaskManagerMetricQueryServicePaths
+ }
+
def getRequestJobManagerStatus : AnyRef = {
RequestJobManagerStatus
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Support TaskManagerMetricQueryServicePaths msg in JobManager Actor
> ------------------------------------------------------------------
>
> Key: FLINK-9943
> URL: https://issues.apache.org/jira/browse/FLINK-9943
> Project: Flink
> Issue Type: New Feature
> Components: Core
> Affects Versions: 1.5.0, 1.5.1
> Reporter: Chuanlei Ni
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The reasons are as follows
> # AkkaJobManagerGateway wraps jm actor ref to support such functionality by
> request RegisteredTaskManagers firstly and request task manager actor to get
> metric query service path one by one. the procedure above is resource-wasted.
> It will be more efficient if we support this functionality in the jm actor
> # we can expose flink metric system directly to external system (such as
> flink client and the like) to support more features in future. For now,
> metric system has been exposed partially because Instance can not (and should
> not) be transfered remotely. This feature will make metrics exposure
> consistent.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)