Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r131675749
  
    --- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
 ---
    @@ -101,127 +98,112 @@ public void update() {
     
        private void fetchMetrics() {
                try {
    -                   Option<scala.Tuple2<ActorGateway, Integer>> 
jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
    -                   if (jobManagerGatewayAndWebPort.isDefined()) {
    -                           ActorGateway jobManager = 
jobManagerGatewayAndWebPort.get()._1();
    +                   Optional<JobManagerGateway> optJobManagerGateway = 
retriever.getJobManagerGatewayNow();
    +                   if (optJobManagerGateway.isPresent()) {
    +                           final JobManagerGateway jobManagerGateway = 
optJobManagerGateway.get();
     
                                /**
                                 * Remove all metrics that belong to a job that 
is not running and no longer archived.
                                 */
    -                           Future<Object> jobDetailsFuture = 
jobManager.ask(new RequestJobDetails(true, true), timeout);
    -                           jobDetailsFuture
    -                                   .onSuccess(new OnSuccess<Object>() {
    -                                           @Override
    -                                           public void onSuccess(Object 
result) throws Throwable {
    -                                                   MultipleJobsDetails 
details = (MultipleJobsDetails) result;
    +                           CompletableFuture<MultipleJobsDetails> 
jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
    +
    +                           jobDetailsFuture.whenCompleteAsync(
    +                                   (MultipleJobsDetails jobDetails, 
Throwable throwable) -> {
    +                                           if (throwable != null) {
    +                                                   LOG.debug("Fetching of 
JobDetails failed.", throwable);
    +                                           } else {
                                                        ArrayList<String> 
toRetain = new ArrayList<>();
    -                                                   for (JobDetails job : 
details.getRunningJobs()) {
    +                                                   for (JobDetails job : 
jobDetails.getRunningJobs()) {
                                                                
toRetain.add(job.getJobId().toString());
                                                        }
    -                                                   for (JobDetails job : 
details.getFinishedJobs()) {
    +                                                   for (JobDetails job : 
jobDetails.getFinishedJobs()) {
                                                                
toRetain.add(job.getJobId().toString());
                                                        }
                                                        synchronized (metrics) {
                                                                
metrics.jobs.keySet().retainAll(toRetain);
                                                        }
                                                }
    -                                   }, ctx);
    -                           logErrorOnFailure(jobDetailsFuture, "Fetching 
of JobDetails failed.");
    +                                   },
    +                                   executor);
     
    -                           String jobManagerPath = jobManager.path();
    -                           String queryServicePath = 
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME;
    -                           ActorRef jobManagerQueryService = 
actorSystem.actorFor(queryServicePath);
    +                           String jobManagerPath = 
jobManagerGateway.getAddress();
    +                           String jmQueryServicePath = 
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME;
     
    -                           queryMetrics(jobManagerQueryService);
    +                           retrieveAndQueryMetrics(jmQueryServicePath);
     
                                /**
                                 * We first request the list of all registered 
task managers from the job manager, and then
                                 * request the respective metric dump from each 
task manager.
                                 *
                                 * <p>All stored metrics that do not belong to 
a registered task manager will be removed.
                                 */
    -                           Future<Object> registeredTaskManagersFuture = 
jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout);
    -                           registeredTaskManagersFuture
    -                                   .onSuccess(new OnSuccess<Object>() {
    -                                           @Override
    -                                           public void onSuccess(Object 
result) throws Throwable {
    -                                                   Iterable<Instance> 
taskManagers = ((JobManagerMessages.RegisteredTaskManagers) 
result).asJavaIterable();
    -                                                   List<String> 
activeTaskManagers = new ArrayList<>();
    -                                                   for (Instance 
taskManager : taskManagers) {
    -                                                           
activeTaskManagers.add(taskManager.getId().toString());
    -
    -                                                           String 
taskManagerPath = taskManager.getTaskManagerGateway().getAddress();
    -                                                           String 
queryServicePath = taskManagerPath.substring(0, 
taskManagerPath.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + 
taskManager.getTaskManagerID().getResourceIdString();
    -                                                           ActorRef 
taskManagerQueryService = actorSystem.actorFor(queryServicePath);
    -
    -                                                           
queryMetrics(taskManagerQueryService);
    -                                                   }
    -                                                   synchronized (metrics) 
{ // remove all metrics belonging to unregistered task managers
    +                           CompletableFuture<Collection<Instance>> 
taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
    +
    +                           taskManagersFuture.whenCompleteAsync(
    +                                   (Collection<Instance> taskManagers, 
Throwable throwable) -> {
    +                                           if (throwable != null) {
    +                                                   LOG.debug("Fetching 
list of registered TaskManagers failed.", throwable);
    +                                           } else {
    +                                                   List<String> 
activeTaskManagers = taskManagers.stream().map(
    +                                                           
taskManagerInstance -> {
    +                                                                   final 
String taskManagerAddress = 
taskManagerInstance.getTaskManagerGateway().getAddress();
    +                                                                   final 
String tmQueryServicePath = taskManagerAddress.substring(0, 
taskManagerAddress.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + 
taskManagerInstance.getTaskManagerID().getResourceIdString();
    +
    +                                                                   
retrieveAndQueryMetrics(tmQueryServicePath);
    +
    +                                                                   return 
taskManagerInstance.getId().toString();
    +                                                           
}).collect(Collectors.toList());
    +
    +                                                   synchronized (metrics) {
                                                                
metrics.taskManagers.keySet().retainAll(activeTaskManagers);
                                                        }
                                                }
    -                                   }, ctx);
    -                           logErrorOnFailure(registeredTaskManagersFuture, 
"Fetchin list of registered TaskManagers failed.");
    +                                   },
    +                                   executor);
                        }
                } catch (Exception e) {
                        LOG.warn("Exception while fetching metrics.", e);
                }
        }
     
    -   private void logErrorOnFailure(Future<Object> future, final String 
message) {
    -           future.onFailure(new OnFailure() {
    -                   @Override
    -                   public void onFailure(Throwable failure) throws 
Throwable {
    -                           LOG.debug(message, failure);
    -                   }
    -           }, ctx);
    -   }
    -
        /**
    -    * Requests a metric dump from the given actor.
    +    * Retrieves and queries the specified QueryServiceGateway.
         *
    -    * @param actor ActorRef to request the dump from
    -     */
    -   private void queryMetrics(ActorRef actor) {
    -           Future<Object> metricQueryFuture = new 
BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout);
    -           metricQueryFuture
    -                   .onSuccess(new OnSuccess<Object>() {
    -                           @Override
    -                           public void onSuccess(Object result) throws 
Throwable {
    -                                   addMetrics(result);
    +    * @param queryServicePath specifying the QueryServiceGateway
    +    */
    +   private void retrieveAndQueryMetrics(String queryServicePath) {
    +           final CompletableFuture<MetricQueryServiceGateway> 
jmQueryServiceGatewayFuture = 
queryServiceRetriever.retrieveService(queryServicePath);
    --- End diff --
    
    variable name is misleading, as we could also be retrieving a taskmanager 
query service.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to