This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c52e1519b509b6a918df037fe3665872ed8146fb
Author: Yuxin Tan <[email protected]>
AuthorDate: Fri Oct 21 17:40:42 2022 +0800

    [FLINK-29134][metrics] Do not repeatedly add useless metric updating tasks 
to avoid wasting resources
    
    This closes #21132.
---
 .../handler/legacy/metrics/MetricFetcherImpl.java  | 148 ++++++++-------
 .../handler/legacy/metrics/MetricFetcherTest.java  | 210 ++++++++++++++-------
 2 files changed, 227 insertions(+), 131 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
index 65d2e162532..6a7c908e927 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
@@ -32,11 +32,13 @@ import 
org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -68,8 +70,12 @@ public class MetricFetcherImpl<T extends RestfulGateway> 
implements MetricFetche
     private final MetricDumpDeserializer deserializer = new 
MetricDumpDeserializer();
     private final long updateInterval;
 
+    @GuardedBy("this")
     private long lastUpdateTime;
 
+    @GuardedBy("this")
+    private CompletableFuture<Void> fetchMetricsFuture = 
FutureUtils.completedVoidFuture();
+
     public MetricFetcherImpl(
             GatewayRetriever<T> retriever,
             MetricQueryServiceRetriever queryServiceRetriever,
@@ -104,9 +110,12 @@ public class MetricFetcherImpl<T extends RestfulGateway> 
implements MetricFetche
     public void update() {
         synchronized (this) {
             long currentTime = System.currentTimeMillis();
-            if (currentTime - lastUpdateTime > updateInterval) {
+            // Before all querying metric tasks are completed, new metric 
updating tasks cannot
+            // be added. This is to avoid resource waste or other problems, 
such as OOM, caused by
+            // adding too many useless querying tasks. See FLINK-29134.
+            if (currentTime - lastUpdateTime > updateInterval && 
fetchMetricsFuture.isDone()) {
                 lastUpdateTime = currentTime;
-                fetchMetrics();
+                fetchMetricsFuture = fetchMetrics();
             }
         }
     }
@@ -118,7 +127,7 @@ public class MetricFetcherImpl<T extends RestfulGateway> 
implements MetricFetche
         }
     }
 
-    private void fetchMetrics() {
+    private CompletableFuture<Void> fetchMetrics() {
         LOG.debug("Start fetching metrics.");
 
         try {
@@ -148,61 +157,79 @@ public class MetricFetcherImpl<T extends RestfulGateway> 
implements MetricFetche
                         },
                         executor);
 
-                CompletableFuture<Collection<String>> 
queryServiceAddressesFuture =
-                        
leaderGateway.requestMetricQueryServiceAddresses(timeout);
-
-                queryServiceAddressesFuture.whenCompleteAsync(
-                        (Collection<String> queryServiceAddresses, Throwable 
throwable) -> {
+                List<CompletableFuture<Void>> waitingMetricsFutures = new 
ArrayList<>();
+                CompletableFuture<Void> jmMetricsFuture = 
queryJmMetricsFuture(leaderGateway);
+                waitingMetricsFutures.add(jmMetricsFuture);
+                jmMetricsFuture.whenCompleteAsync(
+                        (ignore, throwable) -> {
                             if (throwable != null) {
-                                LOG.debug("Requesting paths for query services 
failed.", throwable);
-                            } else {
-                                for (String queryServiceAddress : 
queryServiceAddresses) {
-                                    
retrieveAndQueryMetrics(queryServiceAddress);
-                                }
+                                LOG.debug("Failed to fetch the leader's 
metrics.", throwable);
                             }
                         },
                         executor);
-
-                // TODO: Once the old code has been ditched, remove the 
explicit TaskManager query
-                // service discovery
-                // TODO: and return it as part of 
requestMetricQueryServiceAddresses. Moreover,
-                // change the MetricStore such that
-                // TODO: we don't have to explicitly retain the valid 
TaskManagers, e.g. letting it
-                // be a cache with expiry time
-                CompletableFuture<Collection<Tuple2<ResourceID, String>>>
-                        taskManagerQueryServiceGatewaysFuture =
-                                
leaderGateway.requestTaskManagerMetricQueryServiceAddresses(
-                                        timeout);
-
-                taskManagerQueryServiceGatewaysFuture.whenCompleteAsync(
-                        (Collection<Tuple2<ResourceID, String>> 
queryServiceGateways,
-                                Throwable throwable) -> {
+                CompletableFuture<Void> tmMetricsFuture = 
queryTmMetricsFuture(leaderGateway);
+                waitingMetricsFutures.add(tmMetricsFuture);
+                tmMetricsFuture.whenCompleteAsync(
+                        (ignore, throwable) -> {
                             if (throwable != null) {
-                                LOG.debug(
-                                        "Requesting TaskManager's path for 
query services failed.",
-                                        throwable);
-                            } else {
-                                List<String> taskManagersToRetain =
-                                        queryServiceGateways.stream()
-                                                .map(
-                                                        (Tuple2<ResourceID, 
String> tuple) -> {
-                                                            
queryServiceRetriever
-                                                                    
.retrieveService(tuple.f1)
-                                                                    
.thenAcceptAsync(
-                                                                            
this::queryMetrics,
-                                                                            
executor);
-                                                            return 
tuple.f0.getResourceIdString();
-                                                        })
-                                                .collect(Collectors.toList());
-
-                                
metrics.retainTaskManagers(taskManagersToRetain);
+                                LOG.debug("Failed to fetch the TaskManager's 
metrics.", throwable);
                             }
                         },
                         executor);
+                return FutureUtils.waitForAll(waitingMetricsFutures);
             }
         } catch (Exception e) {
             LOG.debug("Exception while fetching metrics.", e);
+            return FutureUtils.completedExceptionally(e);
         }
+        return FutureUtils.completedVoidFuture();
+    }
+
+    private CompletableFuture<Void> queryJmMetricsFuture(T leaderGateway) {
+        CompletableFuture<Collection<String>> queryServiceAddressesFuture =
+                leaderGateway.requestMetricQueryServiceAddresses(timeout);
+        return queryServiceAddressesFuture.thenComposeAsync(
+                (Collection<String> queryServiceAddresses) -> {
+                    List<CompletableFuture<Void>> queryMetricFutures = new 
ArrayList<>();
+                    for (String queryServiceAddress : queryServiceAddresses) {
+                        
queryMetricFutures.add(retrieveAndQueryMetrics(queryServiceAddress));
+                    }
+                    return FutureUtils.waitForAll(queryMetricFutures);
+                },
+                executor);
+    }
+
+    private CompletableFuture<Void> queryTmMetricsFuture(T leaderGateway) {
+        // TODO: Once the old code has been ditched, remove the explicit 
TaskManager query
+        // service discovery
+        // TODO: and return it as part of requestMetricQueryServiceAddresses. 
Moreover,
+        // change the MetricStore such that
+        // TODO: we don't have to explicitly retain the valid TaskManagers, 
e.g. letting it
+        // be a cache with expiry time
+        CompletableFuture<Collection<Tuple2<ResourceID, String>>>
+                taskManagerQueryServiceGatewaysFuture =
+                        
leaderGateway.requestTaskManagerMetricQueryServiceAddresses(timeout);
+        return taskManagerQueryServiceGatewaysFuture.thenComposeAsync(
+                (Collection<Tuple2<ResourceID, String>> queryServiceGateways) 
-> {
+                    List<CompletableFuture<Void>> queryMetricFutures = new 
ArrayList<>();
+                    List<String> taskManagersToRetain =
+                            queryServiceGateways.stream()
+                                    .map(
+                                            (Tuple2<ResourceID, String> tuple) 
-> {
+                                                queryMetricFutures.add(
+                                                        queryServiceRetriever
+                                                                
.retrieveService(tuple.f1)
+                                                                
.thenComposeAsync(
+                                                                        
this::queryMetrics,
+                                                                        
executor));
+                                                return 
tuple.f0.getResourceIdString();
+                                            })
+                                    .collect(Collectors.toList());
+
+                    metrics.retainTaskManagers(taskManagersToRetain);
+                    return FutureUtils.waitForAll(queryMetricFutures);
+                },
+                executor);
     }
 
     /**
@@ -210,21 +237,12 @@ public class MetricFetcherImpl<T extends RestfulGateway> 
implements MetricFetche
      *
      * @param queryServiceAddress specifying the QueryServiceGateway
      */
-    private void retrieveAndQueryMetrics(String queryServiceAddress) {
+    private CompletableFuture<Void> retrieveAndQueryMetrics(String 
queryServiceAddress) {
         LOG.debug("Retrieve metric query service gateway for {}", 
queryServiceAddress);
 
         final CompletableFuture<MetricQueryServiceGateway> 
queryServiceGatewayFuture =
                 queryServiceRetriever.retrieveService(queryServiceAddress);
-
-        queryServiceGatewayFuture.whenCompleteAsync(
-                (MetricQueryServiceGateway queryServiceGateway, Throwable t) 
-> {
-                    if (t != null) {
-                        LOG.debug("Could not retrieve QueryServiceGateway.", 
t);
-                    } else {
-                        queryMetrics(queryServiceGateway);
-                    }
-                },
-                executor);
+        return queryServiceGatewayFuture.thenComposeAsync(this::queryMetrics, 
executor);
     }
 
     /**
@@ -232,18 +250,16 @@ public class MetricFetcherImpl<T extends RestfulGateway> 
implements MetricFetche
      *
      * @param queryServiceGateway to query for metrics
      */
-    private void queryMetrics(final MetricQueryServiceGateway 
queryServiceGateway) {
+    private CompletableFuture<Void> queryMetrics(
+            final MetricQueryServiceGateway queryServiceGateway) {
         LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
 
-        queryServiceGateway
+        return queryServiceGateway
                 .queryMetrics(timeout)
-                .whenCompleteAsync(
-                        (MetricDumpSerialization.MetricSerializationResult 
result, Throwable t) -> {
-                            if (t != null) {
-                                LOG.debug("Fetching metrics failed.", t);
-                            } else {
-                                
metrics.addAll(deserializer.deserialize(result));
-                            }
+                .thenComposeAsync(
+                        (MetricDumpSerialization.MetricSerializationResult 
result) -> {
+                            metrics.addAll(deserializer.deserialize(result));
+                            return FutureUtils.completedVoidFuture();
                         },
                         executor);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index 27ce10cf128..b8ad6f2dec8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -39,14 +39,18 @@ import 
org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
 import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.concurrent.Executors;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -57,75 +61,18 @@ class MetricFetcherTest {
     @Test
     void testUpdate() {
         final Time timeout = Time.seconds(10L);
-
-        // ========= setup TaskManager
-        // 
=================================================================================
-
         JobID jobID = new JobID();
         ResourceID tmRID = ResourceID.generate();
 
-        // ========= setup QueryServices
-        // 
================================================================================
-
-        final MetricQueryServiceGateway jmQueryService =
-                new TestingMetricQueryServiceGateway.Builder()
-                        .setQueryMetricsSupplier(
-                                () ->
-                                        CompletableFuture.completedFuture(
-                                                new MetricDumpSerialization
-                                                        
.MetricSerializationResult(
-                                                        new byte[0],
-                                                        new byte[0],
-                                                        new byte[0],
-                                                        new byte[0],
-                                                        0,
-                                                        0,
-                                                        0,
-                                                        0)))
-                        .build();
-
-        MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer 
=
-                createRequestDumpAnswer(tmRID, jobID);
-        final MetricQueryServiceGateway tmQueryService =
-                new TestingMetricQueryServiceGateway.Builder()
-                        .setQueryMetricsSupplier(
-                                () -> 
CompletableFuture.completedFuture(requestMetricsAnswer))
-                        .build();
-
-        // ========= setup JobManager
-        // 
==================================================================================
-
-        final TestingRestfulGateway restfulGateway =
-                new TestingRestfulGateway.Builder()
-                        .setRequestMultipleJobDetailsSupplier(
-                                () ->
-                                        CompletableFuture.completedFuture(
-                                                new 
MultipleJobsDetails(Collections.emptyList())))
-                        .setRequestMetricQueryServiceGatewaysSupplier(
-                                () ->
-                                        CompletableFuture.completedFuture(
-                                                
Collections.singleton(jmQueryService.getAddress())))
-                        
.setRequestTaskManagerMetricQueryServiceGatewaysSupplier(
-                                () ->
-                                        CompletableFuture.completedFuture(
-                                                Collections.singleton(
-                                                        Tuple2.of(
-                                                                tmRID,
-                                                                
tmQueryService.getAddress()))))
-                        .build();
-
-        final GatewayRetriever<RestfulGateway> retriever =
-                () -> CompletableFuture.completedFuture(restfulGateway);
-
-        // ========= start MetricFetcher testing
-        // 
=======================================================================
+        // Create metric fetcher
         MetricFetcher fetcher =
-                new MetricFetcherImpl<>(
-                        retriever,
-                        address -> 
CompletableFuture.completedFuture(tmQueryService),
-                        Executors.directExecutor(),
+                createMetricFetcherWithServiceGateways(
+                        jobID,
+                        tmRID,
                         timeout,
-                        
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue());
+                        
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue(),
+                        0,
+                        null);
 
         // verify that update fetches metrics and updates the store
         fetcher.update();
@@ -271,6 +218,139 @@ class MetricFetcherTest {
         assertThat(requestMetricQueryServiceGatewaysCounter).hasValue(2);
     }
 
+    @Test
+    void testIgnoreUpdateRequestWhenFetchingMetrics() throws 
InterruptedException {
+        final long updateInterval = 1000L;
+        final long waitTimeBeforeReturnMetricResults = updateInterval * 2;
+        final Time timeout = Time.seconds(10L);
+        final AtomicInteger requestMetricQueryServiceGatewaysCounter = new 
AtomicInteger(0);
+        final JobID jobID = new JobID();
+        final ResourceID tmRID = ResourceID.generate();
+
+        // Create metric fetcher
+        final MetricFetcher fetcher =
+                createMetricFetcherWithServiceGateways(
+                        jobID,
+                        tmRID,
+                        timeout,
+                        updateInterval,
+                        waitTimeBeforeReturnMetricResults,
+                        requestMetricQueryServiceGatewaysCounter);
+
+        fetcher.update();
+
+        final long start = System.currentTimeMillis();
+        long difference = 0L;
+
+        while (difference <= updateInterval) {
+            Thread.sleep((int) (updateInterval * 1.5f));
+            difference = System.currentTimeMillis() - start;
+        }
+
+        fetcher.update();
+
+        assertThat(requestMetricQueryServiceGatewaysCounter).hasValue(1);
+    }
+
+    private MetricFetcher createMetricFetcherWithServiceGateways(
+            JobID jobID,
+            ResourceID tmRID,
+            Time timeout,
+            long updateInterval,
+            long waitTimeBeforeReturnMetricResults,
+            @Nullable AtomicInteger requestMetricQueryServiceGatewaysCounter) {
+        final ExecutorService executor = 
java.util.concurrent.Executors.newSingleThreadExecutor();
+        // ========= setup QueryServices
+        // 
================================================================================
+
+        final MetricQueryServiceGateway jmQueryService =
+                new TestingMetricQueryServiceGateway.Builder()
+                        .setQueryMetricsSupplier(
+                                () ->
+                                        CompletableFuture.completedFuture(
+                                                new MetricDumpSerialization
+                                                        
.MetricSerializationResult(
+                                                        new byte[0],
+                                                        new byte[0],
+                                                        new byte[0],
+                                                        new byte[0],
+                                                        0,
+                                                        0,
+                                                        0,
+                                                        0)))
+                        .build();
+
+        MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer 
=
+                createRequestDumpAnswer(tmRID, jobID);
+        final MetricQueryServiceGateway tmQueryService =
+                new TestingMetricQueryServiceGateway.Builder()
+                        .setQueryMetricsSupplier(
+                                () -> {
+                                    if (waitTimeBeforeReturnMetricResults > 0) 
{
+                                        CompletableFuture<
+                                                        MetricDumpSerialization
+                                                                
.MetricSerializationResult>
+                                                metricsAnswerFuture = new 
CompletableFuture<>();
+                                        FutureUtils.completedVoidFuture()
+                                                .thenComposeAsync(
+                                                        (ignored) -> {
+                                                            try {
+                                                                Thread.sleep(
+                                                                        
waitTimeBeforeReturnMetricResults);
+                                                            } catch 
(InterruptedException ignore) {
+                                                            }
+                                                            
metricsAnswerFuture.complete(
+                                                                    
requestMetricsAnswer);
+                                                            return 
metricsAnswerFuture;
+                                                        },
+                                                        executor);
+                                        return metricsAnswerFuture;
+                                    } else {
+                                        return 
CompletableFuture.completedFuture(
+                                                requestMetricsAnswer);
+                                    }
+                                })
+                        .build();
+
+        // ========= setup JobManager
+        // 
==================================================================================
+
+        final TestingRestfulGateway restfulGateway =
+                new TestingRestfulGateway.Builder()
+                        .setRequestMultipleJobDetailsSupplier(
+                                () ->
+                                        CompletableFuture.completedFuture(
+                                                new 
MultipleJobsDetails(Collections.emptyList())))
+                        .setRequestMetricQueryServiceGatewaysSupplier(
+                                () -> {
+                                    if 
(requestMetricQueryServiceGatewaysCounter != null) {
+                                        
requestMetricQueryServiceGatewaysCounter.incrementAndGet();
+                                    }
+                                    return CompletableFuture.completedFuture(
+                                            
Collections.singleton(jmQueryService.getAddress()));
+                                })
+                        
.setRequestTaskManagerMetricQueryServiceGatewaysSupplier(
+                                () ->
+                                        CompletableFuture.completedFuture(
+                                                Collections.singleton(
+                                                        Tuple2.of(
+                                                                tmRID,
+                                                                
tmQueryService.getAddress()))))
+                        .build();
+
+        final GatewayRetriever<RestfulGateway> retriever =
+                () -> CompletableFuture.completedFuture(restfulGateway);
+
+        // ========= start MetricFetcher testing
+        // 
=======================================================================
+        return new MetricFetcherImpl<>(
+                retriever,
+                address -> CompletableFuture.completedFuture(tmQueryService),
+                Executors.directExecutor(),
+                timeout,
+                updateInterval);
+    }
+
     private MetricFetcher createMetricFetcher(long updateInterval, 
RestfulGateway restfulGateway) {
         return new MetricFetcherImpl<>(
                 () -> CompletableFuture.completedFuture(restfulGateway),
@@ -286,7 +366,7 @@ class MetricFetcherTest {
                 .setRequestMetricQueryServiceGatewaysSupplier(
                         () -> {
                             
requestMetricQueryServiceGatewaysCounter.incrementAndGet();
-                            return new CompletableFuture<>();
+                            return CompletableFuture.completedFuture(null);
                         })
                 .build();
     }

Reply via email to