This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c52e1519b509b6a918df037fe3665872ed8146fb Author: Yuxin Tan <[email protected]> AuthorDate: Fri Oct 21 17:40:42 2022 +0800 [FLINK-29134][metrics] Do not repeatedly add useless metric updating tasks to avoid wasting resources This closes #21132. --- .../handler/legacy/metrics/MetricFetcherImpl.java | 148 ++++++++------- .../handler/legacy/metrics/MetricFetcherTest.java | 210 ++++++++++++++------- 2 files changed, 227 insertions(+), 131 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java index 65d2e162532..6a7c908e927 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java @@ -32,11 +32,13 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.FutureUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; import java.util.ArrayList; import java.util.Collection; @@ -68,8 +70,12 @@ public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetche private final MetricDumpDeserializer deserializer = new MetricDumpDeserializer(); private final long updateInterval; + @GuardedBy("this") private long lastUpdateTime; + @GuardedBy("this") + private CompletableFuture<Void> fetchMetricsFuture = FutureUtils.completedVoidFuture(); + public MetricFetcherImpl( GatewayRetriever<T> retriever, MetricQueryServiceRetriever queryServiceRetriever, @@ -104,9 +110,12 @@ public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetche public void update() { synchronized (this) { long currentTime = System.currentTimeMillis(); - if (currentTime - lastUpdateTime > updateInterval) { + // Before all querying metric tasks are completed, new metric updating tasks cannot + // be added. This is to avoid resource waste or other problems, such as OOM, caused by + // adding too many useless querying tasks. See FLINK-29134. + if (currentTime - lastUpdateTime > updateInterval && fetchMetricsFuture.isDone()) { lastUpdateTime = currentTime; - fetchMetrics(); + fetchMetricsFuture = fetchMetrics(); } } } @@ -118,7 +127,7 @@ public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetche } } - private void fetchMetrics() { + private CompletableFuture<Void> fetchMetrics() { LOG.debug("Start fetching metrics."); try { @@ -148,61 +157,79 @@ public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetche }, executor); - CompletableFuture<Collection<String>> queryServiceAddressesFuture = - leaderGateway.requestMetricQueryServiceAddresses(timeout); - - queryServiceAddressesFuture.whenCompleteAsync( - (Collection<String> queryServiceAddresses, Throwable throwable) -> { + List<CompletableFuture<Void>> waitingMetricsFutures = new ArrayList<>(); + CompletableFuture<Void> jmMetricsFuture = queryJmMetricsFuture(leaderGateway); + waitingMetricsFutures.add(jmMetricsFuture); + jmMetricsFuture.whenCompleteAsync( + (ignore, throwable) -> { if (throwable != null) { - LOG.debug("Requesting paths for query services failed.", throwable); - } else { - for (String queryServiceAddress : queryServiceAddresses) { - retrieveAndQueryMetrics(queryServiceAddress); - } + LOG.debug("Failed to fetch the leader's metrics.", throwable); } }, executor); - - // TODO: Once the old code has been ditched, remove the explicit TaskManager query - // service discovery - // TODO: and return it as part of requestMetricQueryServiceAddresses. Moreover, - // change the MetricStore such that - // TODO: we don't have to explicitly retain the valid TaskManagers, e.g. letting it - // be a cache with expiry time - CompletableFuture<Collection<Tuple2<ResourceID, String>>> - taskManagerQueryServiceGatewaysFuture = - leaderGateway.requestTaskManagerMetricQueryServiceAddresses( - timeout); - - taskManagerQueryServiceGatewaysFuture.whenCompleteAsync( - (Collection<Tuple2<ResourceID, String>> queryServiceGateways, - Throwable throwable) -> { + CompletableFuture<Void> tmMetricsFuture = queryTmMetricsFuture(leaderGateway); + waitingMetricsFutures.add(tmMetricsFuture); + tmMetricsFuture.whenCompleteAsync( + (ignore, throwable) -> { if (throwable != null) { - LOG.debug( - "Requesting TaskManager's path for query services failed.", - throwable); - } else { - List<String> taskManagersToRetain = - queryServiceGateways.stream() - .map( - (Tuple2<ResourceID, String> tuple) -> { - queryServiceRetriever - .retrieveService(tuple.f1) - .thenAcceptAsync( - this::queryMetrics, - executor); - return tuple.f0.getResourceIdString(); - }) - .collect(Collectors.toList()); - - metrics.retainTaskManagers(taskManagersToRetain); + LOG.debug("Failed to fetch the TaskManager's metrics.", throwable); } }, executor); + return FutureUtils.waitForAll(waitingMetricsFutures); } } catch (Exception e) { LOG.debug("Exception while fetching metrics.", e); + return FutureUtils.completedExceptionally(e); } + return FutureUtils.completedVoidFuture(); + } + + private CompletableFuture<Void> queryJmMetricsFuture(T leaderGateway) { + CompletableFuture<Collection<String>> queryServiceAddressesFuture = + leaderGateway.requestMetricQueryServiceAddresses(timeout); + return queryServiceAddressesFuture.thenComposeAsync( + (Collection<String> queryServiceAddresses) -> { + List<CompletableFuture<Void>> queryMetricFutures = new ArrayList<>(); + for (String queryServiceAddress : queryServiceAddresses) { + queryMetricFutures.add(retrieveAndQueryMetrics(queryServiceAddress)); + } + return FutureUtils.waitForAll(queryMetricFutures); + }, + executor); + } + + private CompletableFuture<Void> queryTmMetricsFuture(T leaderGateway) { + // TODO: Once the old code has been ditched, remove the explicit TaskManager query + // service discovery + // TODO: and return it as part of requestMetricQueryServiceAddresses. Moreover, + // change the MetricStore such that + // TODO: we don't have to explicitly retain the valid TaskManagers, e.g. letting it + // be a cache with expiry time + CompletableFuture<Collection<Tuple2<ResourceID, String>>> + taskManagerQueryServiceGatewaysFuture = + leaderGateway.requestTaskManagerMetricQueryServiceAddresses(timeout); + return taskManagerQueryServiceGatewaysFuture.thenComposeAsync( + (Collection<Tuple2<ResourceID, String>> queryServiceGateways) -> { + List<CompletableFuture<Void>> queryMetricFutures = new ArrayList<>(); + List<String> taskManagersToRetain = + queryServiceGateways.stream() + .map( + (Tuple2<ResourceID, String> tuple) -> { + queryMetricFutures.add( + queryServiceRetriever + .retrieveService(tuple.f1) + .thenComposeAsync( + this::queryMetrics, + executor)); + return tuple.f0.getResourceIdString(); + }) + .collect(Collectors.toList()); + + metrics.retainTaskManagers(taskManagersToRetain); + return FutureUtils.waitForAll(queryMetricFutures); + }, + executor); } /** @@ -210,21 +237,12 @@ public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetche * * @param queryServiceAddress specifying the QueryServiceGateway */ - private void retrieveAndQueryMetrics(String queryServiceAddress) { + private CompletableFuture<Void> retrieveAndQueryMetrics(String queryServiceAddress) { LOG.debug("Retrieve metric query service gateway for {}", queryServiceAddress); final CompletableFuture<MetricQueryServiceGateway> queryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServiceAddress); - - queryServiceGatewayFuture.whenCompleteAsync( - (MetricQueryServiceGateway queryServiceGateway, Throwable t) -> { - if (t != null) { - LOG.debug("Could not retrieve QueryServiceGateway.", t); - } else { - queryMetrics(queryServiceGateway); - } - }, - executor); + return queryServiceGatewayFuture.thenComposeAsync(this::queryMetrics, executor); } /** @@ -232,18 +250,16 @@ public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetche * * @param queryServiceGateway to query for metrics */ - private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) { + private CompletableFuture<Void> queryMetrics( + final MetricQueryServiceGateway queryServiceGateway) { LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress()); - queryServiceGateway + return queryServiceGateway .queryMetrics(timeout) - .whenCompleteAsync( - (MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> { - if (t != null) { - LOG.debug("Fetching metrics failed.", t); - } else { - metrics.addAll(deserializer.deserialize(result)); - } + .thenComposeAsync( + (MetricDumpSerialization.MetricSerializationResult result) -> { + metrics.addAll(deserializer.deserialize(result)); + return FutureUtils.completedVoidFuture(); }, executor); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java index 27ce10cf128..b8ad6f2dec8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java @@ -39,14 +39,18 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway; import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.util.concurrent.Executors; +import org.apache.flink.util.concurrent.FutureUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import javax.annotation.Nullable; + import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; @@ -57,75 +61,18 @@ class MetricFetcherTest { @Test void testUpdate() { final Time timeout = Time.seconds(10L); - - // ========= setup TaskManager - // ================================================================================= - JobID jobID = new JobID(); ResourceID tmRID = ResourceID.generate(); - // ========= setup QueryServices - // ================================================================================ - - final MetricQueryServiceGateway jmQueryService = - new TestingMetricQueryServiceGateway.Builder() - .setQueryMetricsSupplier( - () -> - CompletableFuture.completedFuture( - new MetricDumpSerialization - .MetricSerializationResult( - new byte[0], - new byte[0], - new byte[0], - new byte[0], - 0, - 0, - 0, - 0))) - .build(); - - MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer = - createRequestDumpAnswer(tmRID, jobID); - final MetricQueryServiceGateway tmQueryService = - new TestingMetricQueryServiceGateway.Builder() - .setQueryMetricsSupplier( - () -> CompletableFuture.completedFuture(requestMetricsAnswer)) - .build(); - - // ========= setup JobManager - // ================================================================================== - - final TestingRestfulGateway restfulGateway = - new TestingRestfulGateway.Builder() - .setRequestMultipleJobDetailsSupplier( - () -> - CompletableFuture.completedFuture( - new MultipleJobsDetails(Collections.emptyList()))) - .setRequestMetricQueryServiceGatewaysSupplier( - () -> - CompletableFuture.completedFuture( - Collections.singleton(jmQueryService.getAddress()))) - .setRequestTaskManagerMetricQueryServiceGatewaysSupplier( - () -> - CompletableFuture.completedFuture( - Collections.singleton( - Tuple2.of( - tmRID, - tmQueryService.getAddress())))) - .build(); - - final GatewayRetriever<RestfulGateway> retriever = - () -> CompletableFuture.completedFuture(restfulGateway); - - // ========= start MetricFetcher testing - // ======================================================================= + // Create metric fetcher MetricFetcher fetcher = - new MetricFetcherImpl<>( - retriever, - address -> CompletableFuture.completedFuture(tmQueryService), - Executors.directExecutor(), + createMetricFetcherWithServiceGateways( + jobID, + tmRID, timeout, - MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue()); + MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue(), + 0, + null); // verify that update fetches metrics and updates the store fetcher.update(); @@ -271,6 +218,139 @@ class MetricFetcherTest { assertThat(requestMetricQueryServiceGatewaysCounter).hasValue(2); } + @Test + void testIgnoreUpdateRequestWhenFetchingMetrics() throws InterruptedException { + final long updateInterval = 1000L; + final long waitTimeBeforeReturnMetricResults = updateInterval * 2; + final Time timeout = Time.seconds(10L); + final AtomicInteger requestMetricQueryServiceGatewaysCounter = new AtomicInteger(0); + final JobID jobID = new JobID(); + final ResourceID tmRID = ResourceID.generate(); + + // Create metric fetcher + final MetricFetcher fetcher = + createMetricFetcherWithServiceGateways( + jobID, + tmRID, + timeout, + updateInterval, + waitTimeBeforeReturnMetricResults, + requestMetricQueryServiceGatewaysCounter); + + fetcher.update(); + + final long start = System.currentTimeMillis(); + long difference = 0L; + + while (difference <= updateInterval) { + Thread.sleep((int) (updateInterval * 1.5f)); + difference = System.currentTimeMillis() - start; + } + + fetcher.update(); + + assertThat(requestMetricQueryServiceGatewaysCounter).hasValue(1); + } + + private MetricFetcher createMetricFetcherWithServiceGateways( + JobID jobID, + ResourceID tmRID, + Time timeout, + long updateInterval, + long waitTimeBeforeReturnMetricResults, + @Nullable AtomicInteger requestMetricQueryServiceGatewaysCounter) { + final ExecutorService executor = java.util.concurrent.Executors.newSingleThreadExecutor(); + // ========= setup QueryServices + // ================================================================================ + + final MetricQueryServiceGateway jmQueryService = + new TestingMetricQueryServiceGateway.Builder() + .setQueryMetricsSupplier( + () -> + CompletableFuture.completedFuture( + new MetricDumpSerialization + .MetricSerializationResult( + new byte[0], + new byte[0], + new byte[0], + new byte[0], + 0, + 0, + 0, + 0))) + .build(); + + MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer = + createRequestDumpAnswer(tmRID, jobID); + final MetricQueryServiceGateway tmQueryService = + new TestingMetricQueryServiceGateway.Builder() + .setQueryMetricsSupplier( + () -> { + if (waitTimeBeforeReturnMetricResults > 0) { + CompletableFuture< + MetricDumpSerialization + .MetricSerializationResult> + metricsAnswerFuture = new CompletableFuture<>(); + FutureUtils.completedVoidFuture() + .thenComposeAsync( + (ignored) -> { + try { + Thread.sleep( + waitTimeBeforeReturnMetricResults); + } catch (InterruptedException ignore) { + } + metricsAnswerFuture.complete( + requestMetricsAnswer); + return metricsAnswerFuture; + }, + executor); + return metricsAnswerFuture; + } else { + return CompletableFuture.completedFuture( + requestMetricsAnswer); + } + }) + .build(); + + // ========= setup JobManager + // ================================================================================== + + final TestingRestfulGateway restfulGateway = + new TestingRestfulGateway.Builder() + .setRequestMultipleJobDetailsSupplier( + () -> + CompletableFuture.completedFuture( + new MultipleJobsDetails(Collections.emptyList()))) + .setRequestMetricQueryServiceGatewaysSupplier( + () -> { + if (requestMetricQueryServiceGatewaysCounter != null) { + requestMetricQueryServiceGatewaysCounter.incrementAndGet(); + } + return CompletableFuture.completedFuture( + Collections.singleton(jmQueryService.getAddress())); + }) + .setRequestTaskManagerMetricQueryServiceGatewaysSupplier( + () -> + CompletableFuture.completedFuture( + Collections.singleton( + Tuple2.of( + tmRID, + tmQueryService.getAddress())))) + .build(); + + final GatewayRetriever<RestfulGateway> retriever = + () -> CompletableFuture.completedFuture(restfulGateway); + + // ========= start MetricFetcher testing + // ======================================================================= + return new MetricFetcherImpl<>( + retriever, + address -> CompletableFuture.completedFuture(tmQueryService), + Executors.directExecutor(), + timeout, + updateInterval); + } + private MetricFetcher createMetricFetcher(long updateInterval, RestfulGateway restfulGateway) { return new MetricFetcherImpl<>( () -> CompletableFuture.completedFuture(restfulGateway), @@ -286,7 +366,7 @@ class MetricFetcherTest { .setRequestMetricQueryServiceGatewaysSupplier( () -> { requestMetricQueryServiceGatewaysCounter.incrementAndGet(); - return new CompletableFuture<>(); + return CompletableFuture.completedFuture(null); }) .build(); }
