xintongsong commented on code in PR #21132:
URL: https://github.com/apache/flink/pull/21132#discussion_r1003232260
##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##########
@@ -50,9 +50,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
Review Comment:
I think there are more to be change for the junit migration. @reswqa, could
you help take a look at this?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java:
##########
@@ -70,6 +71,8 @@
private long lastUpdateTime;
+ private CompletableFuture<Void> fetchMetricsFuture =
CompletableFuture.completedFuture(null);
Review Comment:
`@GuardedBy("this")`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java:
##########
@@ -148,97 +154,117 @@ private void fetchMetrics() {
},
executor);
- CompletableFuture<Collection<String>>
queryServiceAddressesFuture =
-
leaderGateway.requestMetricQueryServiceAddresses(timeout);
-
- queryServiceAddressesFuture.whenCompleteAsync(
- (Collection<String> queryServiceAddresses, Throwable
throwable) -> {
- if (throwable != null) {
- LOG.debug("Requesting paths for query services
failed.", throwable);
- } else {
- for (String queryServiceAddress :
queryServiceAddresses) {
-
retrieveAndQueryMetrics(queryServiceAddress);
- }
- }
- },
- executor);
-
- // TODO: Once the old code has been ditched, remove the
explicit TaskManager query
- // service discovery
- // TODO: and return it as part of
requestMetricQueryServiceAddresses. Moreover,
- // change the MetricStore such that
- // TODO: we don't have to explicitly retain the valid
TaskManagers, e.g. letting it
- // be a cache with expiry time
- CompletableFuture<Collection<Tuple2<ResourceID, String>>>
- taskManagerQueryServiceGatewaysFuture =
-
leaderGateway.requestTaskManagerMetricQueryServiceAddresses(
- timeout);
-
- taskManagerQueryServiceGatewaysFuture.whenCompleteAsync(
- (Collection<Tuple2<ResourceID, String>>
queryServiceGateways,
- Throwable throwable) -> {
- if (throwable != null) {
- LOG.debug(
- "Requesting TaskManager's path for
query services failed.",
- throwable);
- } else {
- List<String> taskManagersToRetain =
- queryServiceGateways.stream()
- .map(
- (Tuple2<ResourceID,
String> tuple) -> {
-
queryServiceRetriever
-
.retrieveService(tuple.f1)
-
.thenAcceptAsync(
-
this::queryMetrics,
-
executor);
- return
tuple.f0.getResourceIdString();
- })
- .collect(Collectors.toList());
-
-
metrics.retainTaskManagers(taskManagersToRetain);
- }
- },
- executor);
+ List<CompletableFuture<Void>> waitingMetricsFutures = new
ArrayList<>();
+ waitingMetricsFutures.add(queryJmMetricsFuture(leaderGateway));
+ waitingMetricsFutures.add(queryTmMetricsFuture(leaderGateway));
+ return FutureUtils.waitForAll(waitingMetricsFutures);
Review Comment:
I'd suggest to add a `whenCompleteAsync` before returning the future, to add
some debug logs if the future is completed exceptionally. This aligns with the
previous behavior (the error stack should suggest which step has failed).
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java:
##########
@@ -148,97 +154,117 @@ private void fetchMetrics() {
},
executor);
- CompletableFuture<Collection<String>>
queryServiceAddressesFuture =
-
leaderGateway.requestMetricQueryServiceAddresses(timeout);
-
- queryServiceAddressesFuture.whenCompleteAsync(
- (Collection<String> queryServiceAddresses, Throwable
throwable) -> {
- if (throwable != null) {
- LOG.debug("Requesting paths for query services
failed.", throwable);
- } else {
- for (String queryServiceAddress :
queryServiceAddresses) {
-
retrieveAndQueryMetrics(queryServiceAddress);
- }
- }
- },
- executor);
-
- // TODO: Once the old code has been ditched, remove the
explicit TaskManager query
- // service discovery
- // TODO: and return it as part of
requestMetricQueryServiceAddresses. Moreover,
- // change the MetricStore such that
- // TODO: we don't have to explicitly retain the valid
TaskManagers, e.g. letting it
- // be a cache with expiry time
- CompletableFuture<Collection<Tuple2<ResourceID, String>>>
- taskManagerQueryServiceGatewaysFuture =
-
leaderGateway.requestTaskManagerMetricQueryServiceAddresses(
- timeout);
-
- taskManagerQueryServiceGatewaysFuture.whenCompleteAsync(
- (Collection<Tuple2<ResourceID, String>>
queryServiceGateways,
- Throwable throwable) -> {
- if (throwable != null) {
- LOG.debug(
- "Requesting TaskManager's path for
query services failed.",
- throwable);
- } else {
- List<String> taskManagersToRetain =
- queryServiceGateways.stream()
- .map(
- (Tuple2<ResourceID,
String> tuple) -> {
-
queryServiceRetriever
-
.retrieveService(tuple.f1)
-
.thenAcceptAsync(
-
this::queryMetrics,
-
executor);
- return
tuple.f0.getResourceIdString();
- })
- .collect(Collectors.toList());
-
-
metrics.retainTaskManagers(taskManagersToRetain);
- }
- },
- executor);
+ List<CompletableFuture<Void>> waitingMetricsFutures = new
ArrayList<>();
+ waitingMetricsFutures.add(queryJmMetricsFuture(leaderGateway));
+ waitingMetricsFutures.add(queryTmMetricsFuture(leaderGateway));
+ return FutureUtils.waitForAll(waitingMetricsFutures);
}
} catch (Exception e) {
LOG.debug("Exception while fetching metrics.", e);
}
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private CompletableFuture<Void> queryJmMetricsFuture(T leaderGateway) {
+ CompletableFuture<Collection<String>> queryServiceAddressesFuture =
+ leaderGateway.requestMetricQueryServiceAddresses(timeout);
+
+ return queryServiceAddressesFuture.thenComposeAsync(
+ (Collection<String> queryServiceAddresses) -> {
+ if (queryServiceAddresses.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ List<CompletableFuture<Void>> queryMetricFutures = new
ArrayList<>();
+ for (String queryServiceAddress : queryServiceAddresses) {
+
queryMetricFutures.add(retrieveAndQueryMetrics(queryServiceAddress));
+ }
+ return FutureUtils.waitForAll(queryMetricFutures);
+ },
+ executor);
+ }
+
+ private CompletableFuture<Void> queryTmMetricsFuture(T leaderGateway) {
+ // TODO: Once the old code has been ditched, remove the explicit
TaskManager query
+ // service discovery
+ // TODO: and return it as part of requestMetricQueryServiceAddresses.
Moreover,
+ // change the MetricStore such that
+ // TODO: we don't have to explicitly retain the valid TaskManagers,
e.g. letting it
+ // be a cache with expiry time
+ CompletableFuture<Collection<Tuple2<ResourceID, String>>>
+ taskManagerQueryServiceGatewaysFuture =
+
leaderGateway.requestTaskManagerMetricQueryServiceAddresses(timeout);
+
+ return taskManagerQueryServiceGatewaysFuture.thenComposeAsync(
+ (Collection<Tuple2<ResourceID, String>> queryServiceGateways)
-> {
+ if (queryServiceGateways.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ List<CompletableFuture<Void>> queryMetricFutures = new
ArrayList<>();
+ List<String> taskManagersToRetain =
+ queryServiceGateways.stream()
+ .map(
+ (Tuple2<ResourceID, String> tuple)
-> {
+ CompletableFuture<Void>
finishQueryMetricsFuture =
+ new
CompletableFuture<>();
+ queryServiceRetriever
+
.retrieveService(tuple.f1)
+ .thenAcceptAsync(
+
queryServiceGateway ->
+
queryMetrics(
+
queryServiceGateway,
+
finishQueryMetricsFuture),
+ executor);
+
queryMetricFutures.add(finishQueryMetricsFuture);
+ return
tuple.f0.getResourceIdString();
+ })
+ .collect(Collectors.toList());
+
+ metrics.retainTaskManagers(taskManagersToRetain);
+ return FutureUtils.waitForAll(queryMetricFutures);
+ },
+ executor);
}
/**
* Retrieves and queries the specified QueryServiceGateway.
*
* @param queryServiceAddress specifying the QueryServiceGateway
*/
- private void retrieveAndQueryMetrics(String queryServiceAddress) {
+ private CompletableFuture<Void> retrieveAndQueryMetrics(String
queryServiceAddress) {
LOG.debug("Retrieve metric query service gateway for {}",
queryServiceAddress);
final CompletableFuture<MetricQueryServiceGateway>
queryServiceGatewayFuture =
queryServiceRetriever.retrieveService(queryServiceAddress);
+ final CompletableFuture<Void> finishQueryMetricsFuture = new
CompletableFuture<>();
queryServiceGatewayFuture.whenCompleteAsync(
(MetricQueryServiceGateway queryServiceGateway, Throwable t)
-> {
if (t != null) {
+ finishQueryMetricsFuture.complete(null);
Review Comment:
We can make `queryMetrics` returns a `CompletableFuture`, which can be
connected to `queryServiceGatewayFuture` as the next stage.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java:
##########
@@ -70,6 +71,8 @@
private long lastUpdateTime;
+ private CompletableFuture<Void> fetchMetricsFuture =
CompletableFuture.completedFuture(null);
Review Comment:
Use `FutureUtils.completedVoidFuture()` to avoid frequent creating new
objects. Same for other occurences.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]