This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 992b65af7420ebacf8aa02b26269190919ce3ccd
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Fri Jan 11 15:30:01 2019 +0100

    [hotfix] Introduce MetricFetcher interface
    
    Rename MetricFetcher into MetricFetcherImpl and introduce MetricFetcher 
interface.
    This allows for better testing and hides the type parameter of 
MetricFetcherImpl.
---
 .../flink/docs/rest/RestAPIDocGenerator.java       |   1 +
 .../runtime/webmonitor/WebRuntimeMonitor.java      |   8 +-
 .../runtime/dispatcher/DispatcherRestEndpoint.java |   1 +
 ...tDispatcherResourceManagerComponentFactory.java |  11 +-
 .../flink/runtime/minicluster/MiniCluster.java     |  17 +-
 .../flink/runtime/rest/JobRestEndpointFactory.java |   1 +
 .../flink/runtime/rest/RestEndpointFactory.java    |   1 +
 .../runtime/rest/SessionRestEndpointFactory.java   |   1 +
 .../rest/handler/job/JobDetailsHandler.java        |   8 +-
 .../rest/handler/job/JobVertexDetailsHandler.java  |   6 +-
 .../handler/job/JobVertexTaskManagersHandler.java  |   6 +-
 .../job/SubtaskCurrentAttemptDetailsHandler.java   |   4 +-
 .../job/SubtaskExecutionAttemptDetailsHandler.java |   6 +-
 .../metrics/AbstractAggregatingMetricsHandler.java |   4 +-
 .../job/metrics/AggregatingJobsMetricsHandler.java |   2 +-
 .../metrics/AggregatingSubtasksMetricsHandler.java |   2 +-
 .../AggregatingTaskManagersMetricsHandler.java     |   2 +-
 .../rest/handler/legacy/metrics/MetricFetcher.java | 182 +--------------------
 .../{MetricFetcher.java => MetricFetcherImpl.java} |  35 +++-
 .../handler/legacy/metrics/VoidMetricFetcher.java  |  38 +++++
 .../rest/handler/util/MutableIOMetrics.java        |   1 +
 .../runtime/webmonitor/WebMonitorEndpoint.java     |  11 +-
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   |   3 +-
 .../SubtaskExecutionAttemptDetailsHandlerTest.java |   3 +-
 .../metrics/AggregatingJobsMetricsHandlerTest.java |   2 +-
 .../metrics/AggregatingMetricsHandlerTestBase.java |   5 +-
 .../AggregatingSubtasksMetricsHandlerTest.java     |   2 +-
 .../AggregatingTaskManagersMetricsHandlerTest.java |   2 +-
 .../handler/legacy/metrics/MetricFetcherTest.java  |   2 +-
 29 files changed, 138 insertions(+), 229 deletions(-)

diff --git 
a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java 
b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 98eaf39..891ae08 100644
--- 
a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ 
b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index d0499ef..826b93a 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
@@ -101,7 +102,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 
        private AtomicBoolean cleanedUp = new AtomicBoolean();
 
-
        private MetricFetcher metricFetcher;
 
        public WebRuntimeMonitor(
@@ -193,7 +193,11 @@ public class WebRuntimeMonitor implements WebMonitor {
                } else {
                        sslFactory = null;
                }
-               metricFetcher = new MetricFetcher(retriever, 
queryServiceRetriever, scheduledExecutor, timeout);
+               metricFetcher = new MetricFetcherImpl<>(
+                       retriever,
+                       queryServiceRetriever,
+                       scheduledExecutor,
+                       timeout);
 
                Router router = new Router();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index c6c060d..b36fd6f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.WebMonitorExtension;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index 5059476..8b593fc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -61,6 +61,7 @@ import javax.annotation.Nonnull;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Abstract class which implements the creation of the {@link 
DispatcherResourceManagerComponent} components.
@@ -128,15 +129,17 @@ public abstract class 
AbstractDispatcherResourceManagerComponentFactory<T extend
                                10,
                                Time.milliseconds(50L));
 
+                       final ExecutorService executor = 
WebMonitorEndpoint.createExecutorService(
+                               
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
+                               
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
+                               "DispatcherRestEndpoint");
+
                        webMonitorEndpoint = 
restEndpointFactory.createRestEndpoint(
                                configuration,
                                dispatcherGatewayRetriever,
                                resourceManagerGatewayRetriever,
                                blobServer,
-                               WebMonitorEndpoint.createExecutorService(
-                                       
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
-                                       
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
-                                       "DispatcherRestEndpoint"),
+                               executor,
                                metricQueryServiceRetriever,
                                
highAvailabilityServices.getWebMonitorLeaderElectionService(),
                                fatalErrorHandler);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1da7827..92738ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -97,6 +97,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -347,6 +348,11 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                                        20,
                                        Time.milliseconds(20L));
 
+                               final ExecutorService executor = 
WebMonitorEndpoint.createExecutorService(
+                                       
configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
+                                       
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
+                                       "DispatcherRestEndpoint");
+
                                this.dispatcherRestEndpoint = new 
DispatcherRestEndpoint(
                                        
RestServerEndpointConfiguration.fromConfiguration(configuration),
                                        dispatcherGatewayRetriever,
@@ -354,24 +360,21 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                                        
RestHandlerConfiguration.fromConfiguration(configuration),
                                        resourceManagerGatewayRetriever,
                                        blobServer.getTransientBlobService(),
-                                       
WebMonitorEndpoint.createExecutorService(
-                                               
configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
-                                               
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
-                                               "DispatcherRestEndpoint"),
+                                       executor,
                                        new AkkaQueryServiceRetriever(
                                                metricQueryServiceActorSystem,
                                                
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
                                        
haServices.getWebMonitorLeaderElectionService(),
                                        new ShutDownFatalErrorHandler());
 
-                               dispatcherRestEndpoint.start();
+                               this.dispatcherRestEndpoint.start();
 
-                               restAddressURI = new 
URI(dispatcherRestEndpoint.getRestBaseUrl());
+                               restAddressURI = new 
URI(this.dispatcherRestEndpoint.getRestBaseUrl());
 
                                // bring up the dispatcher that launches 
JobManagers when jobs submitted
                                LOG.info("Starting job dispatcher(s) for 
JobManger");
 
-                               final HistoryServerArchivist 
historyServerArchivist = 
HistoryServerArchivist.createHistoryServerArchivist(configuration, 
dispatcherRestEndpoint);
+                               final HistoryServerArchivist 
historyServerArchivist = 
HistoryServerArchivist.createHistoryServerArchivist(configuration, 
this.dispatcherRestEndpoint);
 
                                dispatcher = new StandaloneDispatcher(
                                        jobManagerRpcService,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
index 9bfc9ac..dec7768 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
index 64750e7..ff10be6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
index 4669745..2dcf6de 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 2a0e1fc..aefe724 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -59,7 +59,7 @@ import java.util.concurrent.Executor;
  */
 public class JobDetailsHandler extends 
AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters> implements 
JsonArchivist {
 
-       private final MetricFetcher<?> metricFetcher;
+       private final MetricFetcher metricFetcher;
 
        public JobDetailsHandler(
                        GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
@@ -68,7 +68,7 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphHandler<JobDetailsI
                        MessageHeaders<EmptyRequestBody, JobDetailsInfo, 
JobMessageParameters> messageHeaders,
                        ExecutionGraphCache executionGraphCache,
                        Executor executor,
-                       MetricFetcher<?> metricFetcher) {
+                       MetricFetcher metricFetcher) {
                super(
                        leaderRetriever,
                        timeout,
@@ -95,7 +95,7 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphHandler<JobDetailsI
                return Collections.singleton(new ArchivedJson(path, json));
        }
 
-       private static JobDetailsInfo createJobDetailsInfo(AccessExecutionGraph 
executionGraph, @Nullable MetricFetcher<?> metricFetcher) {
+       private static JobDetailsInfo createJobDetailsInfo(AccessExecutionGraph 
executionGraph, @Nullable MetricFetcher metricFetcher) {
                final long now = System.currentTimeMillis();
                final long startTime = 
executionGraph.getStatusTimestamp(JobStatus.CREATED);
                final long endTime = 
executionGraph.getState().isGloballyTerminalState() ?
@@ -147,7 +147,7 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphHandler<JobDetailsI
                        AccessExecutionJobVertex ejv,
                        long now,
                        JobID jobId,
-                       MetricFetcher<?> metricFetcher) {
+                       MetricFetcher metricFetcher) {
                int[] tasksPerState = new int[ExecutionState.values().length];
                long startTime = Long.MAX_VALUE;
                long endTime = 0;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
index 71a919a..0cab589 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
@@ -57,7 +57,7 @@ import java.util.concurrent.Executor;
  * Request handler for the job vertex details.
  */
 public class JobVertexDetailsHandler extends 
AbstractExecutionGraphHandler<JobVertexDetailsInfo, JobVertexMessageParameters> 
implements JsonArchivist {
-       private final MetricFetcher<? extends RestfulGateway> metricFetcher;
+       private final MetricFetcher metricFetcher;
 
        public JobVertexDetailsHandler(
                        GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
@@ -66,7 +66,7 @@ public class JobVertexDetailsHandler extends 
AbstractExecutionGraphHandler<JobVe
                        MessageHeaders<EmptyRequestBody, JobVertexDetailsInfo, 
JobVertexMessageParameters> messageHeaders,
                        ExecutionGraphCache executionGraphCache,
                        Executor executor,
-                       MetricFetcher<? extends RestfulGateway> metricFetcher) {
+                       MetricFetcher metricFetcher) {
                super(
                        leaderRetriever,
                        timeout,
@@ -106,7 +106,7 @@ public class JobVertexDetailsHandler extends 
AbstractExecutionGraphHandler<JobVe
                return archive;
        }
 
-       private static JobVertexDetailsInfo 
createJobVertexDetailsInfo(AccessExecutionJobVertex jobVertex, JobID jobID, 
@Nullable MetricFetcher<?> metricFetcher) {
+       private static JobVertexDetailsInfo 
createJobVertexDetailsInfo(AccessExecutionJobVertex jobVertex, JobID jobID, 
@Nullable MetricFetcher metricFetcher) {
                List<JobVertexDetailsInfo.VertexTaskDetail> subtasks = new 
ArrayList<>();
                final long now = System.currentTimeMillis();
                int num = 0;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index 8984009..fdd9cff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -62,7 +62,7 @@ import java.util.concurrent.Executor;
  * runtime and metrics of all its subtasks aggregated by TaskManager.
  */
 public class JobVertexTaskManagersHandler extends 
AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, 
JobVertexMessageParameters> implements JsonArchivist {
-       private MetricFetcher<?> metricFetcher;
+       private MetricFetcher metricFetcher;
 
        public JobVertexTaskManagersHandler(
                        GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
@@ -71,7 +71,7 @@ public class JobVertexTaskManagersHandler extends 
AbstractExecutionGraphHandler<
                        MessageHeaders<EmptyRequestBody, 
JobVertexTaskManagersInfo, JobVertexMessageParameters> messageHeaders,
                        ExecutionGraphCache executionGraphCache,
                        Executor executor,
-                       MetricFetcher<?> metricFetcher) {
+                       MetricFetcher metricFetcher) {
                super(leaderRetriever, timeout, responseHeaders, 
messageHeaders, executionGraphCache, executor);
                this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
        }
@@ -105,7 +105,7 @@ public class JobVertexTaskManagersHandler extends 
AbstractExecutionGraphHandler<
                return archive;
        }
 
-       private static JobVertexTaskManagersInfo 
createJobVertexTaskManagersInfo(AccessExecutionJobVertex jobVertex, JobID 
jobID, @Nullable MetricFetcher<?> metricFetcher) {
+       private static JobVertexTaskManagersInfo 
createJobVertexTaskManagersInfo(AccessExecutionJobVertex jobVertex, JobID 
jobID, @Nullable MetricFetcher metricFetcher) {
                // Build a map that groups tasks by TaskManager
                Map<String, List<AccessExecutionVertex>> taskManagerVertices = 
new HashMap<>();
                for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
index f632669..3a5a5d6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
@@ -46,7 +46,7 @@ import java.util.concurrent.Executor;
  */
 public class SubtaskCurrentAttemptDetailsHandler extends 
AbstractSubtaskHandler<SubtaskExecutionAttemptDetailsInfo, 
SubtaskMessageParameters> {
 
-       private final MetricFetcher<?> metricFetcher;
+       private final MetricFetcher metricFetcher;
 
        public SubtaskCurrentAttemptDetailsHandler(
                GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -55,7 +55,7 @@ public class SubtaskCurrentAttemptDetailsHandler extends 
AbstractSubtaskHandler<
                MessageHeaders<EmptyRequestBody, 
SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> messageHeaders,
                ExecutionGraphCache executionGraphCache,
                Executor executor,
-               MetricFetcher<?> metricFetcher) {
+               MetricFetcher metricFetcher) {
 
                super(leaderRetriever, timeout, responseHeaders, 
messageHeaders, executionGraphCache, executor);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
index 1caa917..c538606 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
@@ -61,7 +61,7 @@ public class SubtaskExecutionAttemptDetailsHandler
        extends 
AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptDetailsInfo, 
SubtaskAttemptMessageParameters>
        implements JsonArchivist {
 
-       private final MetricFetcher<?> metricFetcher;
+       private final MetricFetcher metricFetcher;
 
        /**
         * Instantiates a new subtask execution attempt details handler.
@@ -80,7 +80,7 @@ public class SubtaskExecutionAttemptDetailsHandler
                        MessageHeaders<EmptyRequestBody, 
SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters> 
messageHeaders,
                        ExecutionGraphCache executionGraphCache,
                        Executor executor,
-                       MetricFetcher<?> metricFetcher) {
+                       MetricFetcher metricFetcher) {
 
                super(leaderRetriever, timeout, responseHeaders, 
messageHeaders, executionGraphCache, executor);
 
@@ -131,7 +131,7 @@ public class SubtaskExecutionAttemptDetailsHandler
                        AccessExecution execution,
                        JobID jobID,
                        JobVertexID jobVertexID,
-                       @Nullable MetricFetcher<?> metricFetcher) {
+                       @Nullable MetricFetcher metricFetcher) {
                final MutableIOMetrics ioMetrics = new MutableIOMetrics();
 
                ioMetrics.addIOMetrics(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
index be48d89..b482dfc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
@@ -74,7 +74,7 @@ import java.util.stream.Collectors;
 public abstract class AbstractAggregatingMetricsHandler<P extends 
AbstractAggregatedMetricsParameters<?>> extends 
AbstractRestHandler<RestfulGateway, EmptyRequestBody, 
AggregatedMetricsResponseBody, P> {
 
        private final Executor executor;
-       private final MetricFetcher<?> fetcher;
+       private final MetricFetcher fetcher;
 
        protected AbstractAggregatingMetricsHandler(
                        GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
@@ -82,7 +82,7 @@ public abstract class AbstractAggregatingMetricsHandler<P 
extends AbstractAggreg
                        Map<String, String> responseHeaders,
                        AbstractAggregatedMetricsHeaders<P> messageHeaders,
                        Executor executor,
-                       MetricFetcher<?> fetcher) {
+                       MetricFetcher fetcher) {
                super(leaderRetriever, timeout, responseHeaders, 
messageHeaders);
                this.executor = Preconditions.checkNotNull(executor);
                this.fetcher = Preconditions.checkNotNull(fetcher);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
index df4483c..ead1b3d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
@@ -51,7 +51,7 @@ public class AggregatingJobsMetricsHandler extends 
AbstractAggregatingMetricsHan
                        GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                        Time timeout, Map<String, String> responseHeaders,
                        Executor executor,
-                       MetricFetcher<?> fetcher) {
+                       MetricFetcher fetcher) {
                super(leaderRetriever, timeout, responseHeaders, 
AggregatedJobMetricsHeaders.getInstance(), executor, fetcher);
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
index bd5e83d..a2eed43 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
@@ -58,7 +58,7 @@ public class AggregatingSubtasksMetricsHandler extends 
AbstractAggregatingMetric
                        Time timeout,
                        Map<String, String> responseHeaders,
                        Executor executor,
-                       MetricFetcher<?> fetcher) {
+                       MetricFetcher fetcher) {
                super(leaderRetriever, timeout, responseHeaders, 
AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher);
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
index e943e2b..a638132 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
@@ -51,7 +51,7 @@ public class AggregatingTaskManagersMetricsHandler extends 
AbstractAggregatingMe
                        GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                        Time timeout, Map<String, String> responseHeaders,
                        Executor executor,
-                       MetricFetcher<?> fetcher) {
+                       MetricFetcher fetcher) {
                super(leaderRetriever, timeout, responseHeaders, 
AggregatedTaskManagerMetricsHeaders.getInstance(), executor, fetcher);
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
index 44a6169..3afdd25 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -18,193 +18,23 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
-import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.stream.Collectors;
-
-import static 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
-
 /**
  * The MetricFetcher can be used to fetch metrics from the JobManager and all 
registered TaskManagers.
  *
  * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is 
called, provided that a sufficient time since
  * the last call has passed.
  */
-public class MetricFetcher<T extends RestfulGateway> {
-       private static final Logger LOG = 
LoggerFactory.getLogger(MetricFetcher.class);
-
-       private final GatewayRetriever<T> retriever;
-       private final MetricQueryServiceRetriever queryServiceRetriever;
-       private final Executor executor;
-       private final Time timeout;
-
-       private final MetricStore metrics = new MetricStore();
-       private final MetricDumpDeserializer deserializer = new 
MetricDumpDeserializer();
-
-       private long lastUpdateTime;
-
-       public MetricFetcher(
-                       GatewayRetriever<T> retriever,
-                       MetricQueryServiceRetriever queryServiceRetriever,
-                       Executor executor,
-                       Time timeout) {
-               this.retriever = Preconditions.checkNotNull(retriever);
-               this.queryServiceRetriever = 
Preconditions.checkNotNull(queryServiceRetriever);
-               this.executor = Preconditions.checkNotNull(executor);
-               this.timeout = Preconditions.checkNotNull(timeout);
-       }
-
-       /**
-        * Returns the MetricStore containing all stored metrics.
-        *
-        * @return MetricStore containing all stored metrics;
-        */
-       public MetricStore getMetricStore() {
-               return metrics;
-       }
-
-       /**
-        * This method can be used to signal this MetricFetcher that the 
metrics are still in use and should be updated.
-        */
-       public void update() {
-               synchronized (this) {
-                       long currentTime = System.currentTimeMillis();
-                       if (currentTime - lastUpdateTime > 10000) { // 10 
seconds have passed since the last update
-                               lastUpdateTime = currentTime;
-                               fetchMetrics();
-                       }
-               }
-       }
-
-       private void fetchMetrics() {
-               LOG.debug("Start fetching metrics.");
-
-               try {
-                       Optional<T> optionalLeaderGateway = retriever.getNow();
-                       if (optionalLeaderGateway.isPresent()) {
-                               final T leaderGateway = 
optionalLeaderGateway.get();
-
-                               /*
-                                * Remove all metrics that belong to a job that 
is not running and no longer archived.
-                                */
-                               CompletableFuture<MultipleJobsDetails> 
jobDetailsFuture = leaderGateway.requestMultipleJobDetails(timeout);
-
-                               jobDetailsFuture.whenCompleteAsync(
-                                       (MultipleJobsDetails jobDetails, 
Throwable throwable) -> {
-                                               if (throwable != null) {
-                                                       LOG.debug("Fetching of 
JobDetails failed.", throwable);
-                                               } else {
-                                                       ArrayList<String> 
toRetain = new ArrayList<>(jobDetails.getJobs().size());
-                                                       for (JobDetails job : 
jobDetails.getJobs()) {
-                                                               
toRetain.add(job.getJobId().toString());
-                                                       }
-                                                       
metrics.retainJobs(toRetain);
-                                               }
-                                       },
-                                       executor);
-
-                               CompletableFuture<Collection<String>> 
queryServicePathsFuture = leaderGateway.requestMetricQueryServicePaths(timeout);
-
-                               queryServicePathsFuture.whenCompleteAsync(
-                                       (Collection<String> queryServicePaths, 
Throwable throwable) -> {
-                                               if (throwable != null) {
-                                                       LOG.warn("Requesting 
paths for query services failed.", throwable);
-                                               } else {
-                                                       for (String 
queryServicePath : queryServicePaths) {
-                                                               
retrieveAndQueryMetrics(queryServicePath);
-                                                       }
-                                               }
-                                       },
-                                       executor);
-
-                               // TODO: Once the old code has been ditched, 
remove the explicit TaskManager query service discovery
-                               // TODO: and return it as part of 
requestQueryServicePaths. Moreover, change the MetricStore such that
-                               // TODO: we don't have to explicitly retain the 
valid TaskManagers, e.g. letting it be a cache with expiry time
-                               CompletableFuture<Collection<Tuple2<ResourceID, 
String>>> taskManagerQueryServicePathsFuture = leaderGateway
-                                       
.requestTaskManagerMetricQueryServicePaths(timeout);
-
-                               
taskManagerQueryServicePathsFuture.whenCompleteAsync(
-                                       (Collection<Tuple2<ResourceID, String>> 
queryServicePaths, Throwable throwable) -> {
-                                               if (throwable != null) {
-                                                       LOG.warn("Requesting 
TaskManager's path for query services failed.", throwable);
-                                               } else {
-                                                       List<String> 
taskManagersToRetain = queryServicePaths
-                                                               .stream()
-                                                               .map(
-                                                                       
(Tuple2<ResourceID, String> tuple) -> {
-                                                                               
retrieveAndQueryMetrics(tuple.f1);
-                                                                               
return tuple.f0.getResourceIdString();
-                                                                       }
-                                                               
).collect(Collectors.toList());
-
-                                                       
metrics.retainTaskManagers(taskManagersToRetain);
-                                               }
-                                       },
-                                       executor);
-                       }
-               } catch (Exception e) {
-                       LOG.warn("Exception while fetching metrics.", e);
-               }
-       }
+public interface MetricFetcher {
 
        /**
-        * Retrieves and queries the specified QueryServiceGateway.
+        * Get {@link MetricStore} which contains all currently fetched metrics.
         *
-        * @param queryServicePath specifying the QueryServiceGateway
+        * @return {@link MetricStore} with all fetched metrics
         */
-       private void retrieveAndQueryMetrics(String queryServicePath) {
-               LOG.debug("Retrieve metric query service gateway for {}", 
queryServicePath);
-
-               final CompletableFuture<MetricQueryServiceGateway> 
queryServiceGatewayFuture = 
queryServiceRetriever.retrieveService(queryServicePath);
-
-               queryServiceGatewayFuture.whenCompleteAsync(
-                       (MetricQueryServiceGateway queryServiceGateway, 
Throwable t) -> {
-                               if (t != null) {
-                                       LOG.debug("Could not retrieve 
QueryServiceGateway.", t);
-                               } else {
-                                       queryMetrics(queryServiceGateway);
-                               }
-                       },
-                       executor);
-       }
+       MetricStore getMetricStore();
 
        /**
-        * Query the metrics from the given QueryServiceGateway.
-        *
-        * @param queryServiceGateway to query for metrics
+        * Trigger fetching of metrics.
         */
-       private void queryMetrics(final MetricQueryServiceGateway 
queryServiceGateway) {
-               LOG.debug("Query metrics for {}.", 
queryServiceGateway.getAddress());
-
-               queryServiceGateway
-                       .queryMetrics(timeout)
-                       .whenCompleteAsync(
-                               
(MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
-                                       if (t != null) {
-                                               LOG.debug("Fetching metrics 
failed.", t);
-                                       } else {
-                                               
metrics.addAll(deserializer.deserialize(result));
-                                       }
-                               },
-                               executor);
-       }
+       void update();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
similarity index 86%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
index 44a6169..861f512 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -33,24 +35,26 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
 
 /**
- * The MetricFetcher can be used to fetch metrics from the JobManager and all 
registered TaskManagers.
+ * Implementation of {@link MetricFetcher} which fetches metrics from the 
{@link MetricQueryServiceGateway}.
  *
- * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is 
called, provided that a sufficient time since
- * the last call has passed.
+ * @param <T> type of the {@link RestfulGateway} from which to retrieve the 
metric query service path.
  */
-public class MetricFetcher<T extends RestfulGateway> {
-       private static final Logger LOG = 
LoggerFactory.getLogger(MetricFetcher.class);
+public class MetricFetcherImpl<T extends RestfulGateway> implements 
MetricFetcher {
+       private static final Logger LOG = 
LoggerFactory.getLogger(MetricFetcherImpl.class);
 
        private final GatewayRetriever<T> retriever;
        private final MetricQueryServiceRetriever queryServiceRetriever;
@@ -62,7 +66,7 @@ public class MetricFetcher<T extends RestfulGateway> {
 
        private long lastUpdateTime;
 
-       public MetricFetcher(
+       public MetricFetcherImpl(
                        GatewayRetriever<T> retriever,
                        MetricQueryServiceRetriever queryServiceRetriever,
                        Executor executor,
@@ -78,6 +82,7 @@ public class MetricFetcher<T extends RestfulGateway> {
         *
         * @return MetricStore containing all stored metrics;
         */
+       @Override
        public MetricStore getMetricStore() {
                return metrics;
        }
@@ -85,10 +90,11 @@ public class MetricFetcher<T extends RestfulGateway> {
        /**
         * This method can be used to signal this MetricFetcher that the 
metrics are still in use and should be updated.
         */
+       @Override
        public void update() {
                synchronized (this) {
                        long currentTime = System.currentTimeMillis();
-                       if (currentTime - lastUpdateTime > 10000) { // 10 
seconds have passed since the last update
+                       if (currentTime - lastUpdateTime > 10000L) {
                                lastUpdateTime = currentTime;
                                fetchMetrics();
                        }
@@ -207,4 +213,19 @@ public class MetricFetcher<T extends RestfulGateway> {
                                },
                                executor);
        }
+
+       @Nonnull
+       public static <T extends RestfulGateway> MetricFetcherImpl<T> 
fromConfiguration(
+                       final Configuration configuration,
+                       final MetricQueryServiceRetriever 
metricQueryServiceRetriever,
+                       final GatewayRetriever<T> dispatcherGatewayRetriever,
+                       final ExecutorService executor) {
+               final Time timeout = 
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+
+               return new MetricFetcherImpl<>(
+                       dispatcherGatewayRetriever,
+                       metricQueryServiceRetriever,
+                       executor,
+                       timeout);
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/VoidMetricFetcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/VoidMetricFetcher.java
new file mode 100644
index 0000000..28d5450
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/VoidMetricFetcher.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+/**
+ * No-op implementation of the {@link MetricFetcher}.
+ */
+public enum VoidMetricFetcher implements MetricFetcher {
+       INSTANCE;
+
+       private static final MetricStore METRIC_STORE = new MetricStore();
+
+       @Override
+       public MetricStore getMetricStore() {
+               return METRIC_STORE;
+       }
+
+       @Override
+       public void update() {
+               // noop
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
index 6822417..948ec93 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.rest.handler.job.JobVertexDetailsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index ec14aa3..f215ab8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -77,6 +77,7 @@ import 
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandle
 import 
org.apache.flink.runtime.rest.handler.legacy.files.StdoutFileHandlerSpecification;
 import 
org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
 import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler;
 import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler;
@@ -157,7 +158,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
        private final ExecutionGraphCache executionGraphCache;
        private final CheckpointStatsCache checkpointStatsCache;
 
-       private final MetricFetcher<? extends T> metricFetcher;
+       private final MetricFetcher metricFetcher;
 
        private final LeaderElectionService leaderElectionService;
 
@@ -193,11 +194,11 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                this.checkpointStatsCache = new CheckpointStatsCache(
                        
restConfiguration.getMaxCheckpointStatisticCacheEntries());
 
-               this.metricFetcher = new MetricFetcher<>(
-                       leaderRetriever,
+               this.metricFetcher = MetricFetcherImpl.fromConfiguration(
+                       clusterConfiguration,
                        metricQueryServiceRetriever,
-                       executor,
-                       restConfiguration.getTimeout());
+                       leaderRetriever,
+                       executor);
 
                this.leaderElectionService = 
Preconditions.checkNotNull(leaderElectionService);
                this.fatalErrorHandler = 
Preconditions.checkNotNull(fatalErrorHandler);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index f00e49f..10d670e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
@@ -118,7 +119,7 @@ public class SubtaskCurrentAttemptDetailsHandlerTest 
extends TestLogger {
                // Instance the handler.
                final RestHandlerConfiguration restHandlerConfiguration = 
RestHandlerConfiguration.fromConfiguration(new Configuration());
 
-               final MetricFetcher<?> metricFetcher = new MetricFetcher<>(
+               final MetricFetcher metricFetcher = new MetricFetcherImpl<>(
                        () -> null,
                        path -> null,
                        TestingUtils.defaultExecutor(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 59c6b38..6160d4a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
@@ -116,7 +117,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest 
extends TestLogger {
                        emptyAccumulators);
 
                // Change some fields so we can make it different from other 
sub tasks.
-               final MetricFetcher<?> metricFetcher = new MetricFetcher<>(
+               final MetricFetcher metricFetcher = new MetricFetcherImpl<>(
                        () -> null,
                        path -> null,
                        TestingUtils.defaultExecutor(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
index 76f81dc..a01dd78 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
@@ -67,7 +67,7 @@ public class AggregatingJobsMetricsHandlerTest extends 
AggregatingMetricsHandler
        }
 
        @Override
-       protected AggregatingJobsMetricsHandler getHandler(GatewayRetriever<? 
extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> 
responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+       protected AggregatingJobsMetricsHandler getHandler(GatewayRetriever<? 
extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> 
responseHeaders, Executor executor, MetricFetcher fetcher) {
                return new AggregatingJobsMetricsHandler(
                        leaderRetriever,
                        timeout,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
index 7db669a..41e5a06 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
@@ -88,7 +89,7 @@ public abstract class AggregatingMetricsHandlerTestBase<
 
        @Before
        public void setUp() throws Exception {
-               MetricFetcher<RestfulGateway> fetcher = new 
MetricFetcher<RestfulGateway>(
+               MetricFetcher fetcher = new MetricFetcherImpl<RestfulGateway>(
                        mock(GatewayRetriever.class),
                        mock(MetricQueryServiceRetriever.class),
                        Executors.directExecutor(),
@@ -122,7 +123,7 @@ public abstract class AggregatingMetricsHandlerTestBase<
                Time timeout,
                Map<String, String> responseHeaders,
                Executor executor,
-               MetricFetcher<?> fetcher
+               MetricFetcher fetcher
        );
 
        @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
index 3c6c9f1..06b5b71 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
@@ -79,7 +79,7 @@ public class AggregatingSubtasksMetricsHandlerTest extends 
AggregatingMetricsHan
        }
 
        @Override
-       protected AggregatingSubtasksMetricsHandler 
getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time 
timeout, Map<String, String> responseHeaders, Executor executor, 
MetricFetcher<?> fetcher) {
+       protected AggregatingSubtasksMetricsHandler 
getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time 
timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher 
fetcher) {
                return new AggregatingSubtasksMetricsHandler(
                        leaderRetriever,
                        timeout,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
index 14d3777..4e2cf24 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
@@ -68,7 +68,7 @@ public class AggregatingTaskManagersMetricsHandlerTest 
extends AggregatingMetric
        }
 
        @Override
-       protected AggregatingTaskManagersMetricsHandler 
getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time 
timeout, Map<String, String> responseHeaders, Executor executor, 
MetricFetcher<?> fetcher) {
+       protected AggregatingTaskManagersMetricsHandler 
getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time 
timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher 
fetcher) {
                return new AggregatingTaskManagersMetricsHandler(
                        leaderRetriever,
                        timeout,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index 61c028f..e1ec719 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -108,7 +108,7 @@ public class MetricFetcherTest extends TestLogger {
                
when(queryServiceRetriever.retrieveService(eq(tmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
 
                // ========= start MetricFetcher testing 
=======================================================================
-               MetricFetcher fetcher = new MetricFetcher<>(
+               MetricFetcher fetcher = new MetricFetcherImpl<>(
                        retriever,
                        queryServiceRetriever,
                        Executors.directExecutor(),

Reply via email to