[ 
https://issues.apache.org/jira/browse/FLINK-7381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16119662#comment-16119662
 ] 

ASF GitHub Bot commented on FLINK-7381:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r132141995
  
    --- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
 ---
    @@ -101,127 +98,112 @@ public void update() {
     
        private void fetchMetrics() {
                try {
    -                   Option<scala.Tuple2<ActorGateway, Integer>> 
jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
    -                   if (jobManagerGatewayAndWebPort.isDefined()) {
    -                           ActorGateway jobManager = 
jobManagerGatewayAndWebPort.get()._1();
    +                   Optional<JobManagerGateway> optJobManagerGateway = 
retriever.getJobManagerGatewayNow();
    +                   if (optJobManagerGateway.isPresent()) {
    +                           final JobManagerGateway jobManagerGateway = 
optJobManagerGateway.get();
     
                                /**
                                 * Remove all metrics that belong to a job that 
is not running and no longer archived.
                                 */
    -                           Future<Object> jobDetailsFuture = 
jobManager.ask(new RequestJobDetails(true, true), timeout);
    -                           jobDetailsFuture
    -                                   .onSuccess(new OnSuccess<Object>() {
    -                                           @Override
    -                                           public void onSuccess(Object 
result) throws Throwable {
    -                                                   MultipleJobsDetails 
details = (MultipleJobsDetails) result;
    +                           CompletableFuture<MultipleJobsDetails> 
jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
    +
    +                           jobDetailsFuture.whenCompleteAsync(
    +                                   (MultipleJobsDetails jobDetails, 
Throwable throwable) -> {
    +                                           if (throwable != null) {
    +                                                   LOG.debug("Fetching of 
JobDetails failed.", throwable);
    +                                           } else {
                                                        ArrayList<String> 
toRetain = new ArrayList<>();
    -                                                   for (JobDetails job : 
details.getRunningJobs()) {
    +                                                   for (JobDetails job : 
jobDetails.getRunningJobs()) {
                                                                
toRetain.add(job.getJobId().toString());
                                                        }
    -                                                   for (JobDetails job : 
details.getFinishedJobs()) {
    +                                                   for (JobDetails job : 
jobDetails.getFinishedJobs()) {
                                                                
toRetain.add(job.getJobId().toString());
                                                        }
                                                        synchronized (metrics) {
                                                                
metrics.jobs.keySet().retainAll(toRetain);
                                                        }
                                                }
    -                                   }, ctx);
    -                           logErrorOnFailure(jobDetailsFuture, "Fetching 
of JobDetails failed.");
    +                                   },
    +                                   executor);
     
    -                           String jobManagerPath = jobManager.path();
    -                           String queryServicePath = 
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME;
    -                           ActorRef jobManagerQueryService = 
actorSystem.actorFor(queryServicePath);
    +                           String jobManagerPath = 
jobManagerGateway.getAddress();
    +                           String jmQueryServicePath = 
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME;
     
    -                           queryMetrics(jobManagerQueryService);
    +                           retrieveAndQueryMetrics(jmQueryServicePath);
     
                                /**
                                 * We first request the list of all registered 
task managers from the job manager, and then
                                 * request the respective metric dump from each 
task manager.
                                 *
                                 * <p>All stored metrics that do not belong to 
a registered task manager will be removed.
                                 */
    -                           Future<Object> registeredTaskManagersFuture = 
jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout);
    -                           registeredTaskManagersFuture
    -                                   .onSuccess(new OnSuccess<Object>() {
    -                                           @Override
    -                                           public void onSuccess(Object 
result) throws Throwable {
    -                                                   Iterable<Instance> 
taskManagers = ((JobManagerMessages.RegisteredTaskManagers) 
result).asJavaIterable();
    -                                                   List<String> 
activeTaskManagers = new ArrayList<>();
    -                                                   for (Instance 
taskManager : taskManagers) {
    -                                                           
activeTaskManagers.add(taskManager.getId().toString());
    -
    -                                                           String 
taskManagerPath = taskManager.getTaskManagerGateway().getAddress();
    -                                                           String 
queryServicePath = taskManagerPath.substring(0, 
taskManagerPath.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + 
taskManager.getTaskManagerID().getResourceIdString();
    -                                                           ActorRef 
taskManagerQueryService = actorSystem.actorFor(queryServicePath);
    -
    -                                                           
queryMetrics(taskManagerQueryService);
    -                                                   }
    -                                                   synchronized (metrics) 
{ // remove all metrics belonging to unregistered task managers
    +                           CompletableFuture<Collection<Instance>> 
taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
    +
    +                           taskManagersFuture.whenCompleteAsync(
    +                                   (Collection<Instance> taskManagers, 
Throwable throwable) -> {
    +                                           if (throwable != null) {
    +                                                   LOG.debug("Fetching 
list of registered TaskManagers failed.", throwable);
    +                                           } else {
    +                                                   List<String> 
activeTaskManagers = taskManagers.stream().map(
    +                                                           
taskManagerInstance -> {
    +                                                                   final 
String taskManagerAddress = 
taskManagerInstance.getTaskManagerGateway().getAddress();
    +                                                                   final 
String tmQueryServicePath = taskManagerAddress.substring(0, 
taskManagerAddress.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + 
taskManagerInstance.getTaskManagerID().getResourceIdString();
    +
    +                                                                   
retrieveAndQueryMetrics(tmQueryServicePath);
    +
    +                                                                   return 
taskManagerInstance.getId().toString();
    +                                                           
}).collect(Collectors.toList());
    +
    +                                                   synchronized (metrics) {
                                                                
metrics.taskManagers.keySet().retainAll(activeTaskManagers);
                                                        }
                                                }
    -                                   }, ctx);
    -                           logErrorOnFailure(registeredTaskManagersFuture, 
"Fetchin list of registered TaskManagers failed.");
    +                                   },
    +                                   executor);
                        }
                } catch (Exception e) {
                        LOG.warn("Exception while fetching metrics.", e);
                }
        }
     
    -   private void logErrorOnFailure(Future<Object> future, final String 
message) {
    -           future.onFailure(new OnFailure() {
    -                   @Override
    -                   public void onFailure(Throwable failure) throws 
Throwable {
    -                           LOG.debug(message, failure);
    -                   }
    -           }, ctx);
    -   }
    -
        /**
    -    * Requests a metric dump from the given actor.
    +    * Retrieves and queries the specified QueryServiceGateway.
         *
    -    * @param actor ActorRef to request the dump from
    -     */
    -   private void queryMetrics(ActorRef actor) {
    -           Future<Object> metricQueryFuture = new 
BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout);
    -           metricQueryFuture
    -                   .onSuccess(new OnSuccess<Object>() {
    -                           @Override
    -                           public void onSuccess(Object result) throws 
Throwable {
    -                                   addMetrics(result);
    +    * @param queryServicePath specifying the QueryServiceGateway
    +    */
    +   private void retrieveAndQueryMetrics(String queryServicePath) {
    +           final CompletableFuture<MetricQueryServiceGateway> 
jmQueryServiceGatewayFuture = 
queryServiceRetriever.retrieveService(queryServicePath);
    --- End diff --
    
    True, an artifact of a refactoring. Will change it.


> Decouple WebRuntimeMonitor from ActorGateway
> --------------------------------------------
>
>                 Key: FLINK-7381
>                 URL: https://issues.apache.org/jira/browse/FLINK-7381
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Webfrontend
>    Affects Versions: 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>              Labels: flip-6
>
> The {{WebRuntimeMonitor}} has a hard wired dependency on the {{ActorGateway}} 
> in order to communicate with the {{JobManager}}. In order to make it work 
> with the {{JobMaster}} (Flip-6), we have to abstract this dependency away. I 
> propose to add a {{JobManagerGateway}} interface which can be implemented 
> using Akka for the old {{JobManager}} code. The Flip-6 {{JobMasterGateway}} 
> can then directly inherit from this interface.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to