Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4492#discussion_r132141995 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java --- @@ -101,127 +98,112 @@ public void update() { private void fetchMetrics() { try { - Option<scala.Tuple2<ActorGateway, Integer>> jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort(); - if (jobManagerGatewayAndWebPort.isDefined()) { - ActorGateway jobManager = jobManagerGatewayAndWebPort.get()._1(); + Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow(); + if (optJobManagerGateway.isPresent()) { + final JobManagerGateway jobManagerGateway = optJobManagerGateway.get(); /** * Remove all metrics that belong to a job that is not running and no longer archived. */ - Future<Object> jobDetailsFuture = jobManager.ask(new RequestJobDetails(true, true), timeout); - jobDetailsFuture - .onSuccess(new OnSuccess<Object>() { - @Override - public void onSuccess(Object result) throws Throwable { - MultipleJobsDetails details = (MultipleJobsDetails) result; + CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout); + + jobDetailsFuture.whenCompleteAsync( + (MultipleJobsDetails jobDetails, Throwable throwable) -> { + if (throwable != null) { + LOG.debug("Fetching of JobDetails failed.", throwable); + } else { ArrayList<String> toRetain = new ArrayList<>(); - for (JobDetails job : details.getRunningJobs()) { + for (JobDetails job : jobDetails.getRunningJobs()) { toRetain.add(job.getJobId().toString()); } - for (JobDetails job : details.getFinishedJobs()) { + for (JobDetails job : jobDetails.getFinishedJobs()) { toRetain.add(job.getJobId().toString()); } synchronized (metrics) { metrics.jobs.keySet().retainAll(toRetain); } } - }, ctx); - logErrorOnFailure(jobDetailsFuture, "Fetching of JobDetails failed."); + }, + executor); - String jobManagerPath = jobManager.path(); - String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME; - ActorRef jobManagerQueryService = actorSystem.actorFor(queryServicePath); + String jobManagerPath = jobManagerGateway.getAddress(); + String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME; - queryMetrics(jobManagerQueryService); + retrieveAndQueryMetrics(jmQueryServicePath); /** * We first request the list of all registered task managers from the job manager, and then * request the respective metric dump from each task manager. * * <p>All stored metrics that do not belong to a registered task manager will be removed. */ - Future<Object> registeredTaskManagersFuture = jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout); - registeredTaskManagersFuture - .onSuccess(new OnSuccess<Object>() { - @Override - public void onSuccess(Object result) throws Throwable { - Iterable<Instance> taskManagers = ((JobManagerMessages.RegisteredTaskManagers) result).asJavaIterable(); - List<String> activeTaskManagers = new ArrayList<>(); - for (Instance taskManager : taskManagers) { - activeTaskManagers.add(taskManager.getId().toString()); - - String taskManagerPath = taskManager.getTaskManagerGateway().getAddress(); - String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManager.getTaskManagerID().getResourceIdString(); - ActorRef taskManagerQueryService = actorSystem.actorFor(queryServicePath); - - queryMetrics(taskManagerQueryService); - } - synchronized (metrics) { // remove all metrics belonging to unregistered task managers + CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout); + + taskManagersFuture.whenCompleteAsync( + (Collection<Instance> taskManagers, Throwable throwable) -> { + if (throwable != null) { + LOG.debug("Fetching list of registered TaskManagers failed.", throwable); + } else { + List<String> activeTaskManagers = taskManagers.stream().map( + taskManagerInstance -> { + final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress(); + final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString(); + + retrieveAndQueryMetrics(tmQueryServicePath); + + return taskManagerInstance.getId().toString(); + }).collect(Collectors.toList()); + + synchronized (metrics) { metrics.taskManagers.keySet().retainAll(activeTaskManagers); } } - }, ctx); - logErrorOnFailure(registeredTaskManagersFuture, "Fetchin list of registered TaskManagers failed."); + }, + executor); } } catch (Exception e) { LOG.warn("Exception while fetching metrics.", e); } } - private void logErrorOnFailure(Future<Object> future, final String message) { - future.onFailure(new OnFailure() { - @Override - public void onFailure(Throwable failure) throws Throwable { - LOG.debug(message, failure); - } - }, ctx); - } - /** - * Requests a metric dump from the given actor. + * Retrieves and queries the specified QueryServiceGateway. * - * @param actor ActorRef to request the dump from - */ - private void queryMetrics(ActorRef actor) { - Future<Object> metricQueryFuture = new BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout); - metricQueryFuture - .onSuccess(new OnSuccess<Object>() { - @Override - public void onSuccess(Object result) throws Throwable { - addMetrics(result); + * @param queryServicePath specifying the QueryServiceGateway + */ + private void retrieveAndQueryMetrics(String queryServicePath) { + final CompletableFuture<MetricQueryServiceGateway> jmQueryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath); --- End diff -- True, an artifact of a refactoring. Will change it.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---