This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 992b65af7420ebacf8aa02b26269190919ce3ccd Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Fri Jan 11 15:30:01 2019 +0100 [hotfix] Introduce MetricFetcher interface Rename MetricFetcher into MetricFetcherImpl and introduce MetricFetcher interface. This allows for better testing and hides the type parameter of MetricFetcherImpl. --- .../flink/docs/rest/RestAPIDocGenerator.java | 1 + .../runtime/webmonitor/WebRuntimeMonitor.java | 8 +- .../runtime/dispatcher/DispatcherRestEndpoint.java | 1 + ...tDispatcherResourceManagerComponentFactory.java | 11 +- .../flink/runtime/minicluster/MiniCluster.java | 17 +- .../flink/runtime/rest/JobRestEndpointFactory.java | 1 + .../flink/runtime/rest/RestEndpointFactory.java | 1 + .../runtime/rest/SessionRestEndpointFactory.java | 1 + .../rest/handler/job/JobDetailsHandler.java | 8 +- .../rest/handler/job/JobVertexDetailsHandler.java | 6 +- .../handler/job/JobVertexTaskManagersHandler.java | 6 +- .../job/SubtaskCurrentAttemptDetailsHandler.java | 4 +- .../job/SubtaskExecutionAttemptDetailsHandler.java | 6 +- .../metrics/AbstractAggregatingMetricsHandler.java | 4 +- .../job/metrics/AggregatingJobsMetricsHandler.java | 2 +- .../metrics/AggregatingSubtasksMetricsHandler.java | 2 +- .../AggregatingTaskManagersMetricsHandler.java | 2 +- .../rest/handler/legacy/metrics/MetricFetcher.java | 182 +-------------------- .../{MetricFetcher.java => MetricFetcherImpl.java} | 35 +++- .../handler/legacy/metrics/VoidMetricFetcher.java | 38 +++++ .../rest/handler/util/MutableIOMetrics.java | 1 + .../runtime/webmonitor/WebMonitorEndpoint.java | 11 +- .../SubtaskCurrentAttemptDetailsHandlerTest.java | 3 +- .../SubtaskExecutionAttemptDetailsHandlerTest.java | 3 +- .../metrics/AggregatingJobsMetricsHandlerTest.java | 2 +- .../metrics/AggregatingMetricsHandlerTestBase.java | 5 +- .../AggregatingSubtasksMetricsHandlerTest.java | 2 +- .../AggregatingTaskManagersMetricsHandlerTest.java | 2 +- .../handler/legacy/metrics/MetricFetcherTest.java | 2 +- 29 files changed, 138 insertions(+), 229 deletions(-) diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java index 98eaf39..891ae08 100644 --- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java +++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.RestServerEndpoint; import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.EmptyResponseBody; import org.apache.flink.runtime.rest.messages.MessageHeaders; diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index d0499ef..826b93a 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl; import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl; import org.apache.flink.runtime.rest.handler.router.Router; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; @@ -101,7 +102,6 @@ public class WebRuntimeMonitor implements WebMonitor { private AtomicBoolean cleanedUp = new AtomicBoolean(); - private MetricFetcher metricFetcher; public WebRuntimeMonitor( @@ -193,7 +193,11 @@ public class WebRuntimeMonitor implements WebMonitor { } else { sslFactory = null; } - metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, scheduledExecutor, timeout); + metricFetcher = new MetricFetcherImpl<>( + retriever, + queryServiceRetriever, + scheduledExecutor, + timeout); Router router = new Router(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index c6c060d..b36fd6f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; import org.apache.flink.runtime.webmonitor.WebMonitorExtension; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java index 5059476..8b593fc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java @@ -61,6 +61,7 @@ import javax.annotation.Nonnull; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; /** * Abstract class which implements the creation of the {@link DispatcherResourceManagerComponent} components. @@ -128,15 +129,17 @@ public abstract class AbstractDispatcherResourceManagerComponentFactory<T extend 10, Time.milliseconds(50L)); + final ExecutorService executor = WebMonitorEndpoint.createExecutorService( + configuration.getInteger(RestOptions.SERVER_NUM_THREADS), + configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), + "DispatcherRestEndpoint"); + webMonitorEndpoint = restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, - WebMonitorEndpoint.createExecutorService( - configuration.getInteger(RestOptions.SERVER_NUM_THREADS), - configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), - "DispatcherRestEndpoint"), + executor, metricQueryServiceRetriever, highAvailabilityServices.getWebMonitorLeaderElectionService(), fatalErrorHandler); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 1da7827..92738ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -97,6 +97,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -347,6 +348,11 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { 20, Time.milliseconds(20L)); + final ExecutorService executor = WebMonitorEndpoint.createExecutorService( + configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1), + configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), + "DispatcherRestEndpoint"); + this.dispatcherRestEndpoint = new DispatcherRestEndpoint( RestServerEndpointConfiguration.fromConfiguration(configuration), dispatcherGatewayRetriever, @@ -354,24 +360,21 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { RestHandlerConfiguration.fromConfiguration(configuration), resourceManagerGatewayRetriever, blobServer.getTransientBlobService(), - WebMonitorEndpoint.createExecutorService( - configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1), - configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), - "DispatcherRestEndpoint"), + executor, new AkkaQueryServiceRetriever( metricQueryServiceActorSystem, Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))), haServices.getWebMonitorLeaderElectionService(), new ShutDownFatalErrorHandler()); - dispatcherRestEndpoint.start(); + this.dispatcherRestEndpoint.start(); - restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl()); + restAddressURI = new URI(this.dispatcherRestEndpoint.getRestBaseUrl()); // bring up the dispatcher that launches JobManagers when jobs submitted LOG.info("Starting job dispatcher(s) for JobManger"); - final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint); + final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, this.dispatcherRestEndpoint); dispatcher = new StandaloneDispatcher( jobManagerRpcService, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java index 9bfc9ac..dec7768 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java index 64750e7..ff10be6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.blob.TransientBlobService; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java index 4669745..2dcf6de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java index 2a0e1fc..aefe724 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java @@ -59,7 +59,7 @@ import java.util.concurrent.Executor; */ public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters> implements JsonArchivist { - private final MetricFetcher<?> metricFetcher; + private final MetricFetcher metricFetcher; public JobDetailsHandler( GatewayRetriever<? extends RestfulGateway> leaderRetriever, @@ -68,7 +68,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsI MessageHeaders<EmptyRequestBody, JobDetailsInfo, JobMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor, - MetricFetcher<?> metricFetcher) { + MetricFetcher metricFetcher) { super( leaderRetriever, timeout, @@ -95,7 +95,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsI return Collections.singleton(new ArchivedJson(path, json)); } - private static JobDetailsInfo createJobDetailsInfo(AccessExecutionGraph executionGraph, @Nullable MetricFetcher<?> metricFetcher) { + private static JobDetailsInfo createJobDetailsInfo(AccessExecutionGraph executionGraph, @Nullable MetricFetcher metricFetcher) { final long now = System.currentTimeMillis(); final long startTime = executionGraph.getStatusTimestamp(JobStatus.CREATED); final long endTime = executionGraph.getState().isGloballyTerminalState() ? @@ -147,7 +147,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsI AccessExecutionJobVertex ejv, long now, JobID jobId, - MetricFetcher<?> metricFetcher) { + MetricFetcher metricFetcher) { int[] tasksPerState = new int[ExecutionState.values().length]; long startTime = Long.MAX_VALUE; long endTime = 0; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java index 71a919a..0cab589 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java @@ -57,7 +57,7 @@ import java.util.concurrent.Executor; * Request handler for the job vertex details. */ public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVertexDetailsInfo, JobVertexMessageParameters> implements JsonArchivist { - private final MetricFetcher<? extends RestfulGateway> metricFetcher; + private final MetricFetcher metricFetcher; public JobVertexDetailsHandler( GatewayRetriever<? extends RestfulGateway> leaderRetriever, @@ -66,7 +66,7 @@ public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVe MessageHeaders<EmptyRequestBody, JobVertexDetailsInfo, JobVertexMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor, - MetricFetcher<? extends RestfulGateway> metricFetcher) { + MetricFetcher metricFetcher) { super( leaderRetriever, timeout, @@ -106,7 +106,7 @@ public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVe return archive; } - private static JobVertexDetailsInfo createJobVertexDetailsInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher<?> metricFetcher) { + private static JobVertexDetailsInfo createJobVertexDetailsInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher metricFetcher) { List<JobVertexDetailsInfo.VertexTaskDetail> subtasks = new ArrayList<>(); final long now = System.currentTimeMillis(); int num = 0; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java index 8984009..fdd9cff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java @@ -62,7 +62,7 @@ import java.util.concurrent.Executor; * runtime and metrics of all its subtasks aggregated by TaskManager. */ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters> implements JsonArchivist { - private MetricFetcher<?> metricFetcher; + private MetricFetcher metricFetcher; public JobVertexTaskManagersHandler( GatewayRetriever<? extends RestfulGateway> leaderRetriever, @@ -71,7 +71,7 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler< MessageHeaders<EmptyRequestBody, JobVertexTaskManagersInfo, JobVertexMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor, - MetricFetcher<?> metricFetcher) { + MetricFetcher metricFetcher) { super(leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor); this.metricFetcher = Preconditions.checkNotNull(metricFetcher); } @@ -105,7 +105,7 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler< return archive; } - private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher<?> metricFetcher) { + private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher metricFetcher) { // Build a map that groups tasks by TaskManager Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>(); for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java index f632669..3a5a5d6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java @@ -46,7 +46,7 @@ import java.util.concurrent.Executor; */ public class SubtaskCurrentAttemptDetailsHandler extends AbstractSubtaskHandler<SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> { - private final MetricFetcher<?> metricFetcher; + private final MetricFetcher metricFetcher; public SubtaskCurrentAttemptDetailsHandler( GatewayRetriever<? extends RestfulGateway> leaderRetriever, @@ -55,7 +55,7 @@ public class SubtaskCurrentAttemptDetailsHandler extends AbstractSubtaskHandler< MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor, - MetricFetcher<?> metricFetcher) { + MetricFetcher metricFetcher) { super(leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java index 1caa917..c538606 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java @@ -61,7 +61,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters> implements JsonArchivist { - private final MetricFetcher<?> metricFetcher; + private final MetricFetcher metricFetcher; /** * Instantiates a new subtask execution attempt details handler. @@ -80,7 +80,7 @@ public class SubtaskExecutionAttemptDetailsHandler MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor, - MetricFetcher<?> metricFetcher) { + MetricFetcher metricFetcher) { super(leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor); @@ -131,7 +131,7 @@ public class SubtaskExecutionAttemptDetailsHandler AccessExecution execution, JobID jobID, JobVertexID jobVertexID, - @Nullable MetricFetcher<?> metricFetcher) { + @Nullable MetricFetcher metricFetcher) { final MutableIOMetrics ioMetrics = new MutableIOMetrics(); ioMetrics.addIOMetrics( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java index be48d89..b482dfc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java @@ -74,7 +74,7 @@ import java.util.stream.Collectors; public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggregatedMetricsParameters<?>> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, AggregatedMetricsResponseBody, P> { private final Executor executor; - private final MetricFetcher<?> fetcher; + private final MetricFetcher fetcher; protected AbstractAggregatingMetricsHandler( GatewayRetriever<? extends RestfulGateway> leaderRetriever, @@ -82,7 +82,7 @@ public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggreg Map<String, String> responseHeaders, AbstractAggregatedMetricsHeaders<P> messageHeaders, Executor executor, - MetricFetcher<?> fetcher) { + MetricFetcher fetcher) { super(leaderRetriever, timeout, responseHeaders, messageHeaders); this.executor = Preconditions.checkNotNull(executor); this.fetcher = Preconditions.checkNotNull(fetcher); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java index df4483c..ead1b3d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java @@ -51,7 +51,7 @@ public class AggregatingJobsMetricsHandler extends AbstractAggregatingMetricsHan GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, - MetricFetcher<?> fetcher) { + MetricFetcher fetcher) { super(leaderRetriever, timeout, responseHeaders, AggregatedJobMetricsHeaders.getInstance(), executor, fetcher); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java index bd5e83d..a2eed43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java @@ -58,7 +58,7 @@ public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetric Time timeout, Map<String, String> responseHeaders, Executor executor, - MetricFetcher<?> fetcher) { + MetricFetcher fetcher) { super(leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java index e943e2b..a638132 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java @@ -51,7 +51,7 @@ public class AggregatingTaskManagersMetricsHandler extends AbstractAggregatingMe GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, - MetricFetcher<?> fetcher) { + MetricFetcher fetcher) { super(leaderRetriever, timeout, responseHeaders, AggregatedTaskManagerMetricsHeaders.getInstance(), executor, fetcher); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java index 44a6169..3afdd25 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java @@ -18,193 +18,23 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; -import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; -import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization; -import org.apache.flink.runtime.webmonitor.RestfulGateway; -import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; -import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway; -import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; -import org.apache.flink.util.Preconditions; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.stream.Collectors; - -import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer; - /** * The MetricFetcher can be used to fetch metrics from the JobManager and all registered TaskManagers. * * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since * the last call has passed. */ -public class MetricFetcher<T extends RestfulGateway> { - private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class); - - private final GatewayRetriever<T> retriever; - private final MetricQueryServiceRetriever queryServiceRetriever; - private final Executor executor; - private final Time timeout; - - private final MetricStore metrics = new MetricStore(); - private final MetricDumpDeserializer deserializer = new MetricDumpDeserializer(); - - private long lastUpdateTime; - - public MetricFetcher( - GatewayRetriever<T> retriever, - MetricQueryServiceRetriever queryServiceRetriever, - Executor executor, - Time timeout) { - this.retriever = Preconditions.checkNotNull(retriever); - this.queryServiceRetriever = Preconditions.checkNotNull(queryServiceRetriever); - this.executor = Preconditions.checkNotNull(executor); - this.timeout = Preconditions.checkNotNull(timeout); - } - - /** - * Returns the MetricStore containing all stored metrics. - * - * @return MetricStore containing all stored metrics; - */ - public MetricStore getMetricStore() { - return metrics; - } - - /** - * This method can be used to signal this MetricFetcher that the metrics are still in use and should be updated. - */ - public void update() { - synchronized (this) { - long currentTime = System.currentTimeMillis(); - if (currentTime - lastUpdateTime > 10000) { // 10 seconds have passed since the last update - lastUpdateTime = currentTime; - fetchMetrics(); - } - } - } - - private void fetchMetrics() { - LOG.debug("Start fetching metrics."); - - try { - Optional<T> optionalLeaderGateway = retriever.getNow(); - if (optionalLeaderGateway.isPresent()) { - final T leaderGateway = optionalLeaderGateway.get(); - - /* - * Remove all metrics that belong to a job that is not running and no longer archived. - */ - CompletableFuture<MultipleJobsDetails> jobDetailsFuture = leaderGateway.requestMultipleJobDetails(timeout); - - jobDetailsFuture.whenCompleteAsync( - (MultipleJobsDetails jobDetails, Throwable throwable) -> { - if (throwable != null) { - LOG.debug("Fetching of JobDetails failed.", throwable); - } else { - ArrayList<String> toRetain = new ArrayList<>(jobDetails.getJobs().size()); - for (JobDetails job : jobDetails.getJobs()) { - toRetain.add(job.getJobId().toString()); - } - metrics.retainJobs(toRetain); - } - }, - executor); - - CompletableFuture<Collection<String>> queryServicePathsFuture = leaderGateway.requestMetricQueryServicePaths(timeout); - - queryServicePathsFuture.whenCompleteAsync( - (Collection<String> queryServicePaths, Throwable throwable) -> { - if (throwable != null) { - LOG.warn("Requesting paths for query services failed.", throwable); - } else { - for (String queryServicePath : queryServicePaths) { - retrieveAndQueryMetrics(queryServicePath); - } - } - }, - executor); - - // TODO: Once the old code has been ditched, remove the explicit TaskManager query service discovery - // TODO: and return it as part of requestQueryServicePaths. Moreover, change the MetricStore such that - // TODO: we don't have to explicitly retain the valid TaskManagers, e.g. letting it be a cache with expiry time - CompletableFuture<Collection<Tuple2<ResourceID, String>>> taskManagerQueryServicePathsFuture = leaderGateway - .requestTaskManagerMetricQueryServicePaths(timeout); - - taskManagerQueryServicePathsFuture.whenCompleteAsync( - (Collection<Tuple2<ResourceID, String>> queryServicePaths, Throwable throwable) -> { - if (throwable != null) { - LOG.warn("Requesting TaskManager's path for query services failed.", throwable); - } else { - List<String> taskManagersToRetain = queryServicePaths - .stream() - .map( - (Tuple2<ResourceID, String> tuple) -> { - retrieveAndQueryMetrics(tuple.f1); - return tuple.f0.getResourceIdString(); - } - ).collect(Collectors.toList()); - - metrics.retainTaskManagers(taskManagersToRetain); - } - }, - executor); - } - } catch (Exception e) { - LOG.warn("Exception while fetching metrics.", e); - } - } +public interface MetricFetcher { /** - * Retrieves and queries the specified QueryServiceGateway. + * Get {@link MetricStore} which contains all currently fetched metrics. * - * @param queryServicePath specifying the QueryServiceGateway + * @return {@link MetricStore} with all fetched metrics */ - private void retrieveAndQueryMetrics(String queryServicePath) { - LOG.debug("Retrieve metric query service gateway for {}", queryServicePath); - - final CompletableFuture<MetricQueryServiceGateway> queryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath); - - queryServiceGatewayFuture.whenCompleteAsync( - (MetricQueryServiceGateway queryServiceGateway, Throwable t) -> { - if (t != null) { - LOG.debug("Could not retrieve QueryServiceGateway.", t); - } else { - queryMetrics(queryServiceGateway); - } - }, - executor); - } + MetricStore getMetricStore(); /** - * Query the metrics from the given QueryServiceGateway. - * - * @param queryServiceGateway to query for metrics + * Trigger fetching of metrics. */ - private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) { - LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress()); - - queryServiceGateway - .queryMetrics(timeout) - .whenCompleteAsync( - (MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> { - if (t != null) { - LOG.debug("Fetching metrics failed.", t); - } else { - metrics.addAll(deserializer.deserialize(result)); - } - }, - executor); - } + void update(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java similarity index 86% copy from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java index 44a6169..861f512 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; @@ -33,24 +35,26 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer; /** - * The MetricFetcher can be used to fetch metrics from the JobManager and all registered TaskManagers. + * Implementation of {@link MetricFetcher} which fetches metrics from the {@link MetricQueryServiceGateway}. * - * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since - * the last call has passed. + * @param <T> type of the {@link RestfulGateway} from which to retrieve the metric query service path. */ -public class MetricFetcher<T extends RestfulGateway> { - private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class); +public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetcher { + private static final Logger LOG = LoggerFactory.getLogger(MetricFetcherImpl.class); private final GatewayRetriever<T> retriever; private final MetricQueryServiceRetriever queryServiceRetriever; @@ -62,7 +66,7 @@ public class MetricFetcher<T extends RestfulGateway> { private long lastUpdateTime; - public MetricFetcher( + public MetricFetcherImpl( GatewayRetriever<T> retriever, MetricQueryServiceRetriever queryServiceRetriever, Executor executor, @@ -78,6 +82,7 @@ public class MetricFetcher<T extends RestfulGateway> { * * @return MetricStore containing all stored metrics; */ + @Override public MetricStore getMetricStore() { return metrics; } @@ -85,10 +90,11 @@ public class MetricFetcher<T extends RestfulGateway> { /** * This method can be used to signal this MetricFetcher that the metrics are still in use and should be updated. */ + @Override public void update() { synchronized (this) { long currentTime = System.currentTimeMillis(); - if (currentTime - lastUpdateTime > 10000) { // 10 seconds have passed since the last update + if (currentTime - lastUpdateTime > 10000L) { lastUpdateTime = currentTime; fetchMetrics(); } @@ -207,4 +213,19 @@ public class MetricFetcher<T extends RestfulGateway> { }, executor); } + + @Nonnull + public static <T extends RestfulGateway> MetricFetcherImpl<T> fromConfiguration( + final Configuration configuration, + final MetricQueryServiceRetriever metricQueryServiceRetriever, + final GatewayRetriever<T> dispatcherGatewayRetriever, + final ExecutorService executor) { + final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT)); + + return new MetricFetcherImpl<>( + dispatcherGatewayRetriever, + metricQueryServiceRetriever, + executor, + timeout); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/VoidMetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/VoidMetricFetcher.java new file mode 100644 index 0000000..28d5450 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/VoidMetricFetcher.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +/** + * No-op implementation of the {@link MetricFetcher}. + */ +public enum VoidMetricFetcher implements MetricFetcher { + INSTANCE; + + private static final MetricStore METRIC_STORE = new MetricStore(); + + @Override + public MetricStore getMetricStore() { + return METRIC_STORE; + } + + @Override + public void update() { + // noop + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java index 6822417..948ec93 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.rest.handler.job.JobVertexDetailsHandler; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index ec14aa3..f215ab8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -77,6 +77,7 @@ import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandle import org.apache.flink.runtime.rest.handler.legacy.files.StdoutFileHandlerSpecification; import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl; import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler; import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler; import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler; @@ -157,7 +158,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp private final ExecutionGraphCache executionGraphCache; private final CheckpointStatsCache checkpointStatsCache; - private final MetricFetcher<? extends T> metricFetcher; + private final MetricFetcher metricFetcher; private final LeaderElectionService leaderElectionService; @@ -193,11 +194,11 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp this.checkpointStatsCache = new CheckpointStatsCache( restConfiguration.getMaxCheckpointStatisticCacheEntries()); - this.metricFetcher = new MetricFetcher<>( - leaderRetriever, + this.metricFetcher = MetricFetcherImpl.fromConfiguration( + clusterConfiguration, metricQueryServiceRetriever, - executor, - restConfiguration.getTimeout()); + leaderRetriever, + executor); this.leaderElectionService = Preconditions.checkNotNull(leaderElectionService); this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java index f00e49f..10d670e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; @@ -118,7 +119,7 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger { // Instance the handler. final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(new Configuration()); - final MetricFetcher<?> metricFetcher = new MetricFetcher<>( + final MetricFetcher metricFetcher = new MetricFetcherImpl<>( () -> null, path -> null, TestingUtils.defaultExecutor(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java index 59c6b38..6160d4a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; @@ -116,7 +117,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger { emptyAccumulators); // Change some fields so we can make it different from other sub tasks. - final MetricFetcher<?> metricFetcher = new MetricFetcher<>( + final MetricFetcher metricFetcher = new MetricFetcherImpl<>( () -> null, path -> null, TestingUtils.defaultExecutor(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java index 76f81dc..a01dd78 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java @@ -67,7 +67,7 @@ public class AggregatingJobsMetricsHandlerTest extends AggregatingMetricsHandler } @Override - protected AggregatingJobsMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) { + protected AggregatingJobsMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher fetcher) { return new AggregatingJobsMetricsHandler( leaderRetriever, timeout, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java index 7db669a..41e5a06 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.metrics.dump.MetricDump; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters; @@ -88,7 +89,7 @@ public abstract class AggregatingMetricsHandlerTestBase< @Before public void setUp() throws Exception { - MetricFetcher<RestfulGateway> fetcher = new MetricFetcher<RestfulGateway>( + MetricFetcher fetcher = new MetricFetcherImpl<RestfulGateway>( mock(GatewayRetriever.class), mock(MetricQueryServiceRetriever.class), Executors.directExecutor(), @@ -122,7 +123,7 @@ public abstract class AggregatingMetricsHandlerTestBase< Time timeout, Map<String, String> responseHeaders, Executor executor, - MetricFetcher<?> fetcher + MetricFetcher fetcher ); @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java index 3c6c9f1..06b5b71 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java @@ -79,7 +79,7 @@ public class AggregatingSubtasksMetricsHandlerTest extends AggregatingMetricsHan } @Override - protected AggregatingSubtasksMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) { + protected AggregatingSubtasksMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher fetcher) { return new AggregatingSubtasksMetricsHandler( leaderRetriever, timeout, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java index 14d3777..4e2cf24 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java @@ -68,7 +68,7 @@ public class AggregatingTaskManagersMetricsHandlerTest extends AggregatingMetric } @Override - protected AggregatingTaskManagersMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) { + protected AggregatingTaskManagersMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher fetcher) { return new AggregatingTaskManagersMetricsHandler( leaderRetriever, timeout, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java index 61c028f..e1ec719 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java @@ -108,7 +108,7 @@ public class MetricFetcherTest extends TestLogger { when(queryServiceRetriever.retrieveService(eq(tmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(tmQueryService)); // ========= start MetricFetcher testing ======================================================================= - MetricFetcher fetcher = new MetricFetcher<>( + MetricFetcher fetcher = new MetricFetcherImpl<>( retriever, queryServiceRetriever, Executors.directExecutor(),