xintongsong commented on code in PR #21132:
URL: https://github.com/apache/flink/pull/21132#discussion_r1003232260


##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##########
@@ -50,9 +50,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;

Review Comment:
   I think there are more to be change for the junit migration. @reswqa, could 
you help take a look at this?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java:
##########
@@ -70,6 +71,8 @@
 
     private long lastUpdateTime;
 
+    private CompletableFuture<Void> fetchMetricsFuture = 
CompletableFuture.completedFuture(null);

Review Comment:
   `@GuardedBy("this")`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java:
##########
@@ -148,97 +154,117 @@ private void fetchMetrics() {
                         },
                         executor);
 
-                CompletableFuture<Collection<String>> 
queryServiceAddressesFuture =
-                        
leaderGateway.requestMetricQueryServiceAddresses(timeout);
-
-                queryServiceAddressesFuture.whenCompleteAsync(
-                        (Collection<String> queryServiceAddresses, Throwable 
throwable) -> {
-                            if (throwable != null) {
-                                LOG.debug("Requesting paths for query services 
failed.", throwable);
-                            } else {
-                                for (String queryServiceAddress : 
queryServiceAddresses) {
-                                    
retrieveAndQueryMetrics(queryServiceAddress);
-                                }
-                            }
-                        },
-                        executor);
-
-                // TODO: Once the old code has been ditched, remove the 
explicit TaskManager query
-                // service discovery
-                // TODO: and return it as part of 
requestMetricQueryServiceAddresses. Moreover,
-                // change the MetricStore such that
-                // TODO: we don't have to explicitly retain the valid 
TaskManagers, e.g. letting it
-                // be a cache with expiry time
-                CompletableFuture<Collection<Tuple2<ResourceID, String>>>
-                        taskManagerQueryServiceGatewaysFuture =
-                                
leaderGateway.requestTaskManagerMetricQueryServiceAddresses(
-                                        timeout);
-
-                taskManagerQueryServiceGatewaysFuture.whenCompleteAsync(
-                        (Collection<Tuple2<ResourceID, String>> 
queryServiceGateways,
-                                Throwable throwable) -> {
-                            if (throwable != null) {
-                                LOG.debug(
-                                        "Requesting TaskManager's path for 
query services failed.",
-                                        throwable);
-                            } else {
-                                List<String> taskManagersToRetain =
-                                        queryServiceGateways.stream()
-                                                .map(
-                                                        (Tuple2<ResourceID, 
String> tuple) -> {
-                                                            
queryServiceRetriever
-                                                                    
.retrieveService(tuple.f1)
-                                                                    
.thenAcceptAsync(
-                                                                            
this::queryMetrics,
-                                                                            
executor);
-                                                            return 
tuple.f0.getResourceIdString();
-                                                        })
-                                                .collect(Collectors.toList());
-
-                                
metrics.retainTaskManagers(taskManagersToRetain);
-                            }
-                        },
-                        executor);
+                List<CompletableFuture<Void>> waitingMetricsFutures = new 
ArrayList<>();
+                waitingMetricsFutures.add(queryJmMetricsFuture(leaderGateway));
+                waitingMetricsFutures.add(queryTmMetricsFuture(leaderGateway));
+                return FutureUtils.waitForAll(waitingMetricsFutures);

Review Comment:
   I'd suggest to add a `whenCompleteAsync` before returning the future, to add 
some debug logs if the future is completed exceptionally. This aligns with the 
previous behavior (the error stack should suggest which step has failed).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java:
##########
@@ -148,97 +154,117 @@ private void fetchMetrics() {
                         },
                         executor);
 
-                CompletableFuture<Collection<String>> 
queryServiceAddressesFuture =
-                        
leaderGateway.requestMetricQueryServiceAddresses(timeout);
-
-                queryServiceAddressesFuture.whenCompleteAsync(
-                        (Collection<String> queryServiceAddresses, Throwable 
throwable) -> {
-                            if (throwable != null) {
-                                LOG.debug("Requesting paths for query services 
failed.", throwable);
-                            } else {
-                                for (String queryServiceAddress : 
queryServiceAddresses) {
-                                    
retrieveAndQueryMetrics(queryServiceAddress);
-                                }
-                            }
-                        },
-                        executor);
-
-                // TODO: Once the old code has been ditched, remove the 
explicit TaskManager query
-                // service discovery
-                // TODO: and return it as part of 
requestMetricQueryServiceAddresses. Moreover,
-                // change the MetricStore such that
-                // TODO: we don't have to explicitly retain the valid 
TaskManagers, e.g. letting it
-                // be a cache with expiry time
-                CompletableFuture<Collection<Tuple2<ResourceID, String>>>
-                        taskManagerQueryServiceGatewaysFuture =
-                                
leaderGateway.requestTaskManagerMetricQueryServiceAddresses(
-                                        timeout);
-
-                taskManagerQueryServiceGatewaysFuture.whenCompleteAsync(
-                        (Collection<Tuple2<ResourceID, String>> 
queryServiceGateways,
-                                Throwable throwable) -> {
-                            if (throwable != null) {
-                                LOG.debug(
-                                        "Requesting TaskManager's path for 
query services failed.",
-                                        throwable);
-                            } else {
-                                List<String> taskManagersToRetain =
-                                        queryServiceGateways.stream()
-                                                .map(
-                                                        (Tuple2<ResourceID, 
String> tuple) -> {
-                                                            
queryServiceRetriever
-                                                                    
.retrieveService(tuple.f1)
-                                                                    
.thenAcceptAsync(
-                                                                            
this::queryMetrics,
-                                                                            
executor);
-                                                            return 
tuple.f0.getResourceIdString();
-                                                        })
-                                                .collect(Collectors.toList());
-
-                                
metrics.retainTaskManagers(taskManagersToRetain);
-                            }
-                        },
-                        executor);
+                List<CompletableFuture<Void>> waitingMetricsFutures = new 
ArrayList<>();
+                waitingMetricsFutures.add(queryJmMetricsFuture(leaderGateway));
+                waitingMetricsFutures.add(queryTmMetricsFuture(leaderGateway));
+                return FutureUtils.waitForAll(waitingMetricsFutures);
             }
         } catch (Exception e) {
             LOG.debug("Exception while fetching metrics.", e);
         }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private CompletableFuture<Void> queryJmMetricsFuture(T leaderGateway) {
+        CompletableFuture<Collection<String>> queryServiceAddressesFuture =
+                leaderGateway.requestMetricQueryServiceAddresses(timeout);
+
+        return queryServiceAddressesFuture.thenComposeAsync(
+                (Collection<String> queryServiceAddresses) -> {
+                    if (queryServiceAddresses.isEmpty()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    List<CompletableFuture<Void>> queryMetricFutures = new 
ArrayList<>();
+                    for (String queryServiceAddress : queryServiceAddresses) {
+                        
queryMetricFutures.add(retrieveAndQueryMetrics(queryServiceAddress));
+                    }
+                    return FutureUtils.waitForAll(queryMetricFutures);
+                },
+                executor);
+    }
+
+    private CompletableFuture<Void> queryTmMetricsFuture(T leaderGateway) {
+        // TODO: Once the old code has been ditched, remove the explicit 
TaskManager query
+        // service discovery
+        // TODO: and return it as part of requestMetricQueryServiceAddresses. 
Moreover,
+        // change the MetricStore such that
+        // TODO: we don't have to explicitly retain the valid TaskManagers, 
e.g. letting it
+        // be a cache with expiry time
+        CompletableFuture<Collection<Tuple2<ResourceID, String>>>
+                taskManagerQueryServiceGatewaysFuture =
+                        
leaderGateway.requestTaskManagerMetricQueryServiceAddresses(timeout);
+
+        return taskManagerQueryServiceGatewaysFuture.thenComposeAsync(
+                (Collection<Tuple2<ResourceID, String>> queryServiceGateways) 
-> {
+                    if (queryServiceGateways.isEmpty()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    List<CompletableFuture<Void>> queryMetricFutures = new 
ArrayList<>();
+                    List<String> taskManagersToRetain =
+                            queryServiceGateways.stream()
+                                    .map(
+                                            (Tuple2<ResourceID, String> tuple) 
-> {
+                                                CompletableFuture<Void> 
finishQueryMetricsFuture =
+                                                        new 
CompletableFuture<>();
+                                                queryServiceRetriever
+                                                        
.retrieveService(tuple.f1)
+                                                        .thenAcceptAsync(
+                                                                
queryServiceGateway ->
+                                                                        
queryMetrics(
+                                                                               
 queryServiceGateway,
+                                                                               
 finishQueryMetricsFuture),
+                                                                executor);
+                                                
queryMetricFutures.add(finishQueryMetricsFuture);
+                                                return 
tuple.f0.getResourceIdString();
+                                            })
+                                    .collect(Collectors.toList());
+
+                    metrics.retainTaskManagers(taskManagersToRetain);
+                    return FutureUtils.waitForAll(queryMetricFutures);
+                },
+                executor);
     }
 
     /**
      * Retrieves and queries the specified QueryServiceGateway.
      *
      * @param queryServiceAddress specifying the QueryServiceGateway
      */
-    private void retrieveAndQueryMetrics(String queryServiceAddress) {
+    private CompletableFuture<Void> retrieveAndQueryMetrics(String 
queryServiceAddress) {
         LOG.debug("Retrieve metric query service gateway for {}", 
queryServiceAddress);
 
         final CompletableFuture<MetricQueryServiceGateway> 
queryServiceGatewayFuture =
                 queryServiceRetriever.retrieveService(queryServiceAddress);
+        final CompletableFuture<Void> finishQueryMetricsFuture = new 
CompletableFuture<>();
 
         queryServiceGatewayFuture.whenCompleteAsync(
                 (MetricQueryServiceGateway queryServiceGateway, Throwable t) 
-> {
                     if (t != null) {
+                        finishQueryMetricsFuture.complete(null);

Review Comment:
   We can make `queryMetrics` returns a `CompletableFuture`, which can be 
connected to `queryServiceGatewayFuture` as the next stage.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java:
##########
@@ -70,6 +71,8 @@
 
     private long lastUpdateTime;
 
+    private CompletableFuture<Void> fetchMetricsFuture = 
CompletableFuture.completedFuture(null);

Review Comment:
   Use `FutureUtils.completedVoidFuture()` to avoid frequent creating new 
objects. Same for other occurences.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to