[
https://issues.apache.org/jira/browse/FLINK-7381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16119662#comment-16119662
]
ASF GitHub Bot commented on FLINK-7381:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4492#discussion_r132141995
--- Diff:
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
---
@@ -101,127 +98,112 @@ public void update() {
private void fetchMetrics() {
try {
- Option<scala.Tuple2<ActorGateway, Integer>>
jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
- if (jobManagerGatewayAndWebPort.isDefined()) {
- ActorGateway jobManager =
jobManagerGatewayAndWebPort.get()._1();
+ Optional<JobManagerGateway> optJobManagerGateway =
retriever.getJobManagerGatewayNow();
+ if (optJobManagerGateway.isPresent()) {
+ final JobManagerGateway jobManagerGateway =
optJobManagerGateway.get();
/**
* Remove all metrics that belong to a job that
is not running and no longer archived.
*/
- Future<Object> jobDetailsFuture =
jobManager.ask(new RequestJobDetails(true, true), timeout);
- jobDetailsFuture
- .onSuccess(new OnSuccess<Object>() {
- @Override
- public void onSuccess(Object
result) throws Throwable {
- MultipleJobsDetails
details = (MultipleJobsDetails) result;
+ CompletableFuture<MultipleJobsDetails>
jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
+
+ jobDetailsFuture.whenCompleteAsync(
+ (MultipleJobsDetails jobDetails,
Throwable throwable) -> {
+ if (throwable != null) {
+ LOG.debug("Fetching of
JobDetails failed.", throwable);
+ } else {
ArrayList<String>
toRetain = new ArrayList<>();
- for (JobDetails job :
details.getRunningJobs()) {
+ for (JobDetails job :
jobDetails.getRunningJobs()) {
toRetain.add(job.getJobId().toString());
}
- for (JobDetails job :
details.getFinishedJobs()) {
+ for (JobDetails job :
jobDetails.getFinishedJobs()) {
toRetain.add(job.getJobId().toString());
}
synchronized (metrics) {
metrics.jobs.keySet().retainAll(toRetain);
}
}
- }, ctx);
- logErrorOnFailure(jobDetailsFuture, "Fetching
of JobDetails failed.");
+ },
+ executor);
- String jobManagerPath = jobManager.path();
- String queryServicePath =
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) +
MetricQueryService.METRIC_QUERY_SERVICE_NAME;
- ActorRef jobManagerQueryService =
actorSystem.actorFor(queryServicePath);
+ String jobManagerPath =
jobManagerGateway.getAddress();
+ String jmQueryServicePath =
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) +
MetricQueryService.METRIC_QUERY_SERVICE_NAME;
- queryMetrics(jobManagerQueryService);
+ retrieveAndQueryMetrics(jmQueryServicePath);
/**
* We first request the list of all registered
task managers from the job manager, and then
* request the respective metric dump from each
task manager.
*
* <p>All stored metrics that do not belong to
a registered task manager will be removed.
*/
- Future<Object> registeredTaskManagersFuture =
jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout);
- registeredTaskManagersFuture
- .onSuccess(new OnSuccess<Object>() {
- @Override
- public void onSuccess(Object
result) throws Throwable {
- Iterable<Instance>
taskManagers = ((JobManagerMessages.RegisteredTaskManagers)
result).asJavaIterable();
- List<String>
activeTaskManagers = new ArrayList<>();
- for (Instance
taskManager : taskManagers) {
-
activeTaskManagers.add(taskManager.getId().toString());
-
- String
taskManagerPath = taskManager.getTaskManagerGateway().getAddress();
- String
queryServicePath = taskManagerPath.substring(0,
taskManagerPath.lastIndexOf('/') + 1) +
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" +
taskManager.getTaskManagerID().getResourceIdString();
- ActorRef
taskManagerQueryService = actorSystem.actorFor(queryServicePath);
-
-
queryMetrics(taskManagerQueryService);
- }
- synchronized (metrics)
{ // remove all metrics belonging to unregistered task managers
+ CompletableFuture<Collection<Instance>>
taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+ taskManagersFuture.whenCompleteAsync(
+ (Collection<Instance> taskManagers,
Throwable throwable) -> {
+ if (throwable != null) {
+ LOG.debug("Fetching
list of registered TaskManagers failed.", throwable);
+ } else {
+ List<String>
activeTaskManagers = taskManagers.stream().map(
+
taskManagerInstance -> {
+ final
String taskManagerAddress =
taskManagerInstance.getTaskManagerGateway().getAddress();
+ final
String tmQueryServicePath = taskManagerAddress.substring(0,
taskManagerAddress.lastIndexOf('/') + 1) +
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" +
taskManagerInstance.getTaskManagerID().getResourceIdString();
+
+
retrieveAndQueryMetrics(tmQueryServicePath);
+
+ return
taskManagerInstance.getId().toString();
+
}).collect(Collectors.toList());
+
+ synchronized (metrics) {
metrics.taskManagers.keySet().retainAll(activeTaskManagers);
}
}
- }, ctx);
- logErrorOnFailure(registeredTaskManagersFuture,
"Fetchin list of registered TaskManagers failed.");
+ },
+ executor);
}
} catch (Exception e) {
LOG.warn("Exception while fetching metrics.", e);
}
}
- private void logErrorOnFailure(Future<Object> future, final String
message) {
- future.onFailure(new OnFailure() {
- @Override
- public void onFailure(Throwable failure) throws
Throwable {
- LOG.debug(message, failure);
- }
- }, ctx);
- }
-
/**
- * Requests a metric dump from the given actor.
+ * Retrieves and queries the specified QueryServiceGateway.
*
- * @param actor ActorRef to request the dump from
- */
- private void queryMetrics(ActorRef actor) {
- Future<Object> metricQueryFuture = new
BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout);
- metricQueryFuture
- .onSuccess(new OnSuccess<Object>() {
- @Override
- public void onSuccess(Object result) throws
Throwable {
- addMetrics(result);
+ * @param queryServicePath specifying the QueryServiceGateway
+ */
+ private void retrieveAndQueryMetrics(String queryServicePath) {
+ final CompletableFuture<MetricQueryServiceGateway>
jmQueryServiceGatewayFuture =
queryServiceRetriever.retrieveService(queryServicePath);
--- End diff --
True, an artifact of a refactoring. Will change it.
> Decouple WebRuntimeMonitor from ActorGateway
> --------------------------------------------
>
> Key: FLINK-7381
> URL: https://issues.apache.org/jira/browse/FLINK-7381
> Project: Flink
> Issue Type: Sub-task
> Components: Webfrontend
> Affects Versions: 1.4.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Labels: flip-6
>
> The {{WebRuntimeMonitor}} has a hard wired dependency on the {{ActorGateway}}
> in order to communicate with the {{JobManager}}. In order to make it work
> with the {{JobMaster}} (Flip-6), we have to abstract this dependency away. I
> propose to add a {{JobManagerGateway}} interface which can be implemented
> using Akka for the old {{JobManager}} code. The Flip-6 {{JobMasterGateway}}
> can then directly inherit from this interface.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)