[FLINK-7781][metrics][REST] Support on-demand aggregations
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d48b208a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d48b208a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d48b208a Branch: refs/heads/master Commit: d48b208a7207bc50ba76019b7cb71039573bd26e Parents: 4207eaa Author: zentol <[email protected]> Authored: Wed Oct 25 11:21:28 2017 +0200 Committer: zentol <[email protected]> Committed: Tue Oct 31 11:53:44 2017 +0100 ---------------------------------------------------------------------- .../runtime/webmonitor/WebRuntimeMonitor.java | 23 +- .../SubtaskExecutionAttemptDetailsHandler.java | 1 + .../AbstractAggregatingMetricsHandler.java | 224 ++++++++++++++++ .../legacy/metrics/AbstractMetricsHandler.java | 9 +- .../metrics/AggregatingJobsMetricsHandler.java | 57 ++++ .../AggregatingSubtasksMetricsHandler.java | 122 +++++++++ .../AggregatingTaskManagersMetricsHandler.java | 57 ++++ .../legacy/metrics/DoubleAccumulator.java | 257 +++++++++++++++++++ .../metrics/JobManagerMetricsHandler.java | 2 +- .../legacy/metrics/JobMetricsHandler.java | 2 +- .../legacy/metrics/JobVertexMetricsHandler.java | 5 +- .../handler/legacy/metrics/MetricStore.java | 33 ++- .../legacy/metrics/SubtaskMetricsHandler.java | 70 +++++ .../metrics/TaskManagerMetricsHandler.java | 2 +- .../AbstractAggregatingMetricsHandlerTest.java | 186 ++++++++++++++ .../metrics/AbstractMetricsHandlerTest.java | 2 + .../AggregatingJobsMetricsHandlerTest.java | 66 +++++ .../AggregatingSubtasksMetricsHandlerTest.java | 70 +++++ ...gregatingTaskManagersMetricsHandlerTest.java | 66 +++++ .../handler/legacy/metrics/MetricStoreTest.java | 21 ++ .../metrics/SubtaskMetricsHandlerTest.java | 92 +++++++ 21 files changed, 1351 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 1a6178f..fe5f106 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -63,10 +63,14 @@ import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsD import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsSubtasksHandler; import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsHandler; import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler; +import org.apache.flink.runtime.rest.handler.legacy.metrics.AggregatingJobsMetricsHandler; +import org.apache.flink.runtime.rest.handler.legacy.metrics.AggregatingSubtasksMetricsHandler; +import org.apache.flink.runtime.rest.handler.legacy.metrics.AggregatingTaskManagersMetricsHandler; import org.apache.flink.runtime.rest.handler.legacy.metrics.JobManagerMetricsHandler; import org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler; import org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler; import org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler; import org.apache.flink.runtime.webmonitor.handlers.JarAccessDeniedHandler; import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler; @@ -261,6 +265,20 @@ public class WebRuntimeMonitor implements WebMonitor { // job manager configuration get(router, new ClusterConfigHandler(scheduledExecutor, config)); + // metrics + get(router, new JobManagerMetricsHandler(scheduledExecutor, metricFetcher)); + + get(router, new AggregatingTaskManagersMetricsHandler(scheduledExecutor, metricFetcher)); + get(router, new TaskManagerMetricsHandler(scheduledExecutor, metricFetcher)); + + get(router, new AggregatingJobsMetricsHandler(scheduledExecutor, metricFetcher)); + get(router, new JobMetricsHandler(scheduledExecutor, metricFetcher)); + + get(router, new JobVertexMetricsHandler(scheduledExecutor, metricFetcher)); + + get(router, new AggregatingSubtasksMetricsHandler(scheduledExecutor, metricFetcher)); + get(router, new SubtaskMetricsHandler(scheduledExecutor, metricFetcher)); + // overview over jobs get(router, new CurrentJobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, true, true)); get(router, new CurrentJobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, true, false)); @@ -275,7 +293,6 @@ public class WebRuntimeMonitor implements WebMonitor { get(router, new JobVertexTaskManagersHandler(executionGraphCache, scheduledExecutor, metricFetcher)); get(router, new JobVertexAccumulatorsHandler(executionGraphCache, scheduledExecutor)); get(router, new JobVertexBackPressureHandler(executionGraphCache, scheduledExecutor, backPressureStatsTracker, refreshInterval)); - get(router, new JobVertexMetricsHandler(scheduledExecutor, metricFetcher)); get(router, new SubtasksAllAccumulatorsHandler(executionGraphCache, scheduledExecutor)); get(router, new SubtaskCurrentAttemptDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher)); get(router, new SubtaskExecutionAttemptDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher)); @@ -285,7 +302,6 @@ public class WebRuntimeMonitor implements WebMonitor { get(router, new JobConfigHandler(executionGraphCache, scheduledExecutor)); get(router, new JobExceptionsHandler(executionGraphCache, scheduledExecutor)); get(router, new JobAccumulatorsHandler(executionGraphCache, scheduledExecutor)); - get(router, new JobMetricsHandler(scheduledExecutor, metricFetcher)); get(router, new TaskManagersHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, metricFetcher)); get(router, @@ -304,7 +320,6 @@ public class WebRuntimeMonitor implements WebMonitor { timeout, TaskManagerLogHandler.FileMode.STDOUT, config)); - get(router, new TaskManagerMetricsHandler(scheduledExecutor, metricFetcher)); router // log and stdout @@ -318,8 +333,6 @@ public class WebRuntimeMonitor implements WebMonitor { .GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") : new StaticFileServerHandler<>(retriever, localRestAddress, timeout, logFiles.stdOutFile)); - get(router, new JobManagerMetricsHandler(scheduledExecutor, metricFetcher)); - // Cancel a job via GET (for proper integration with YARN this has to be performed via GET) get(router, new JobCancellationHandler(scheduledExecutor, timeout)); // DELETE is the preferred way of canceling a job (Rest-conform) http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java index 5aa8312..7ae7621 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java @@ -51,6 +51,7 @@ import static org.apache.flink.runtime.rest.handler.legacy.SubtaskCurrentAttempt */ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemptRequestHandler { + public static final String PARAMETER_SUBTASK_INDEX = "subtasknum"; private static final String SUBTASK_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt"; private final MetricFetcher fetcher; http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java new file mode 100644 index 0000000..9386a56 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler; +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +/** + * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics + * or the aggregated values of them across all/selected entities. + * + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code [ { "id" : "X" } ] } + * + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. + * {@code /metrics?get=X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + * + * <p>The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are + * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned. + * {@code /metrics?get=X,Y&agg=min,max} + * The handler will then return a list of objects containing the aggregations for the requested metrics. + * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]} + */ +abstract class AbstractAggregatingMetricsHandler extends AbstractJsonRequestHandler { + + protected final Logger log = LoggerFactory.getLogger(getClass()); + + private static final String PARAMETER_AGGREGATION = "agg"; + + private final MetricFetcher fetcher; + + AbstractAggregatingMetricsHandler(Executor executor, MetricFetcher fetcher) { + super(executor); + this.fetcher = Preconditions.checkNotNull(fetcher); + } + + protected abstract Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, String> pathParameters, Map<String, String> queryParameters); + + @Override + public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { + return CompletableFuture.supplyAsync( + () -> { + try { + fetcher.update(); + String requestedMetricsList = queryParams.get(AbstractMetricsHandler.PARAMETER_METRICS); + String aggTypeList = queryParams.get(PARAMETER_AGGREGATION); + MetricStore store = fetcher.getMetricStore(); + + Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, pathParams, queryParams); + if (stores == null){ + return "[]"; + } + + if (requestedMetricsList == null) { + Collection<String> list = getAvailableMetrics(stores); + return mapMetricListToJson(list); + } + + if (requestedMetricsList.isEmpty()) { + /* + * The WebInterface doesn't check whether the list of available metrics was empty. This can lead to a + * request for which the "get" parameter is an empty string. + */ + return "[]"; + } + + String[] requestedMetrics = requestedMetricsList.split(","); + + List<DoubleAccumulator.DoubleAccumulatorFactory<?>> requestedAggregationsFactories = new ArrayList<>(); + // by default we return all aggregations + if (aggTypeList == null || aggTypeList.isEmpty()) { + requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get()); + requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get()); + requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get()); + requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get()); + } else { + for (String aggregation : aggTypeList.split(",")) { + switch (aggregation.toLowerCase()) { + case DoubleAccumulator.DoubleMinimum.NAME: + requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get()); + break; + case DoubleAccumulator.DoubleMaximum.NAME: + requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get()); + break; + case DoubleAccumulator.DoubleSum.NAME: + requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get()); + break; + case DoubleAccumulator.DoubleAverage.NAME: + requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get()); + break; + default: + log.warn("Invalid aggregation specified: {}", aggregation.toLowerCase()); + } + } + } + + return getAggregatedMetricValues(stores, requestedMetrics, requestedAggregationsFactories); + } catch (Exception e) { + throw new CompletionException(new FlinkException("Could not retrieve metrics.", e)); + } + }, + executor); + } + + /** + * Returns a JSON string containing a list of all available metrics in the given stores. Effectively this method maps + * the union of all key-sets to JSON. + * + * @param stores metrics + * @return JSON string containing a list of all available metrics + */ + private static Collection<String> getAvailableMetrics(Collection<? extends MetricStore.ComponentMetricStore> stores) { + Set<String> uniqueMetrics = new HashSet<>(); + for (MetricStore.ComponentMetricStore store : stores) { + uniqueMetrics.addAll(store.metrics.keySet()); + } + return uniqueMetrics; + } + + private static String mapMetricListToJson(Collection<String> metrics) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + gen.writeStartArray(); + for (String m : metrics) { + gen.writeStartObject(); + gen.writeStringField("id", m); + gen.writeEndObject(); + } + gen.writeEndArray(); + + gen.close(); + return writer.toString(); + } + + /** + * Extracts and aggregates all requested metrics from the given metric stores, and maps the result to a JSON string. + * + * @param stores available metrics + * @param requestedMetrics ids of requested metrics + * @param requestedAggregationsFactories requested aggregations + * @return JSON string containing the requested metrics + * @throws IOException + */ + private String getAggregatedMetricValues( + Collection<? extends MetricStore.ComponentMetricStore> stores, + String[] requestedMetrics, + List<DoubleAccumulator.DoubleAccumulatorFactory<?>> requestedAggregationsFactories) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + gen.writeStartArray(); + for (String requestedMetric : requestedMetrics) { + final Collection<Double> values = new ArrayList<>(); + try { + for (MetricStore.ComponentMetricStore store : stores) { + String stringValue = store.metrics.get(requestedMetric); + if (stringValue != null) { + values.add(Double.valueOf(stringValue)); + } + } + } catch (NumberFormatException nfe) { + log.warn("The metric {} is not numeric and can't be aggregated.", requestedMetric, nfe); + // metric is not numeric so we can't perform aggregations => ignore it + continue; + } + if (!values.isEmpty()) { + + gen.writeStartObject(); + gen.writeStringField("id", requestedMetric); + for (DoubleAccumulator.DoubleAccumulatorFactory<?> accFactory : requestedAggregationsFactories) { + Iterator<Double> valuesIterator = values.iterator(); + DoubleAccumulator acc = accFactory.get(valuesIterator.next()); + valuesIterator.forEachRemaining(acc::add); + + gen.writeStringField(acc.getName(), String.valueOf(acc.getValue())); + } + gen.writeEndObject(); + } + } + gen.writeEndArray(); + + gen.close(); + return writer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java index 186397b..87d47c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java @@ -39,12 +39,15 @@ import java.util.concurrent.Executor; * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned. * {@code [ { "id" : "X" } ] } * - * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. - * {@code /get?X,Y} + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. + * {@code /metrics?get=X,Y} * The handler will then return a list containing the values of the requested metrics. * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } */ public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler { + + public static final String PARAMETER_METRICS = "get"; + private final MetricFetcher fetcher; public AbstractMetricsHandler(Executor executor, MetricFetcher fetcher) { @@ -57,7 +60,7 @@ public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler return CompletableFuture.supplyAsync( () -> { fetcher.update(); - String requestedMetricsList = queryParams.get("get"); + String requestedMetricsList = queryParams.get(PARAMETER_METRICS); try { return requestedMetricsList != null ? getMetricsValues(pathParams, requestedMetricsList) http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingJobsMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingJobsMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingJobsMetricsHandler.java new file mode 100644 index 0000000..3fe921e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingJobsMetricsHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.Executor; + +/** + * Request handler that returns, aggregated across all jobs, a list of all available metrics or the values + * for a set of metrics. + * + * <p>Specific jobs can be selected for aggregation by specifying a comma-separated list of job IDs. + * {@code /metrics?get=X,Y&jobs=A,B} + */ +public class AggregatingJobsMetricsHandler extends AbstractAggregatingMetricsHandler { + public AggregatingJobsMetricsHandler(Executor executor, MetricFetcher fetcher) { + super(executor, fetcher); + } + + @Override + protected Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, String> pathParameters, Map<String, String> queryParameters) { + String jobsList = queryParameters.get("jobs"); + if (jobsList == null || jobsList.isEmpty()) { + return store.getJobs().values(); + } else { + String[] jobs = jobsList.split(","); + Collection<MetricStore.ComponentMetricStore> jobStores = new ArrayList<>(); + for (String job : jobs) { + jobStores.add(store.getJobMetricStore(job)); + } + return jobStores; + } + } + + @Override + public String[] getPaths() { + return new String[]{"/jobs/metrics"}; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandler.java new file mode 100644 index 0000000..e51b2ad --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandler.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +import org.apache.flink.util.UnionIterator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.Executor; + +import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler.PARAMETER_JOB_ID; +import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID; + +/** + * Request handler that returns, aggregated across all subtasks, a list of all available metrics or the values + * for a set of metrics. + * + * <p>Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges. + * {@code /metrics?get=X,Y&subtasks=0-2,4-5} + */ +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler { + public AggregatingSubtasksMetricsHandler(Executor executor, MetricFetcher fetcher) { + super(executor, fetcher); + } + + @Override + protected Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, String> pathParameters, Map<String, String> queryParameters) { + String jobID = pathParameters.get(PARAMETER_JOB_ID); + String taskID = pathParameters.get(PARAMETER_VERTEX_ID); + if (jobID == null) { + return Collections.emptyList(); + } + if (taskID == null) { + return Collections.emptyList(); + } + String subtasksList = queryParameters.get("subtasks"); + if (subtasksList == null || subtasksList.isEmpty()) { + return store.getTaskMetricStore(jobID, taskID).getAllSubtaskMetricStores(); + } else { + Iterable<Integer> subtasks = getIntegerRangeFromString(subtasksList); + Collection<MetricStore.ComponentMetricStore> subtaskStores = new ArrayList<>(); + for (int subtask : subtasks) { + subtaskStores.add(store.getSubtaskMetricStore(jobID, taskID, subtask)); + } + return subtaskStores; + } + } + + @Override + public String[] getPaths() { + return new String[]{"/jobs/:jobid/vertices/:vertexid/subtasks/metrics"}; + } + + private Iterable<Integer> getIntegerRangeFromString(String rangeDefinition) { + final String[] ranges = rangeDefinition.trim().split(","); + + UnionIterator<Integer> iterators = new UnionIterator<>(); + + for (String rawRange : ranges) { + try { + Iterator<Integer> rangeIterator; + String range = rawRange.trim(); + int dashIdx = range.indexOf('-'); + if (dashIdx == -1) { + // only one value in range: + rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator(); + } else { + // evaluate range + final int start = Integer.valueOf(range.substring(0, dashIdx)); + final int end = Integer.valueOf(range.substring(dashIdx + 1, range.length())); + rangeIterator = new Iterator<Integer>() { + int i = start; + + @Override + public boolean hasNext() { + return i <= end; + } + + @Override + public Integer next() { + if (hasNext()) { + return i++; + } else { + throw new NoSuchElementException(); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not supported"); + } + }; + } + iterators.add(rangeIterator); + } catch (NumberFormatException nfe) { + log.warn("Invalid value {} specified for integer range. Not a number.", rawRange, nfe); + } + } + + return iterators; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingTaskManagersMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingTaskManagersMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingTaskManagersMetricsHandler.java new file mode 100644 index 0000000..b7daa77 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingTaskManagersMetricsHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.Executor; + +/** + * Request handler that returns, aggregated across all task managers, a list of all available metrics or the values for + * a set of metrics. + * + * <p>Specific taskmanagers can be selected for aggregation by specifying a comma-separated list of taskmanager IDs. + * {@code /metrics?get=X,Y&taskmanagers=A,B} + */ +public class AggregatingTaskManagersMetricsHandler extends AbstractAggregatingMetricsHandler { + public AggregatingTaskManagersMetricsHandler(Executor executor, MetricFetcher fetcher) { + super(executor, fetcher); + } + + @Override + protected Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, String> pathParameters, Map<String, String> queryParameters) { + String taskmanagersList = queryParameters.get("taskmanagers"); + if (taskmanagersList == null || taskmanagersList.isEmpty()) { + return store.getTaskManagers().values(); + } else { + String[] taskmanagers = taskmanagersList.split(","); + Collection<MetricStore.TaskManagerMetricStore> taskmanagerStores = new ArrayList<>(); + for (String taskmanager : taskmanagers) { + taskmanagerStores.add(store.getTaskManagerMetricStore(taskmanager)); + } + return taskmanagerStores; + } + } + + @Override + public String[] getPaths() { + return new String[]{"/taskmanagers/metrics"}; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java new file mode 100644 index 0000000..2c5e593 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +/** + * An interface for accumulating double values. + */ +interface DoubleAccumulator { + + /** + * Adds the given value to this accumulator. + * + * @param value value to add + */ + void add(double value); + + /** + * Returns the current value of this accumulator. + * + * @return current value of this accumulator + */ + double getValue(); + + /** + * Returns the name of this accumulator type. This name is used as a suffix for exposed metrics. + * + * @return name of this accumulator type + */ + String getName(); + + /** + * A factory for {@link DoubleAccumulator}s. This allows us to regenerate a new set of accumulators for each metrics + * without re-evaluating the "agg" query parameter or re-using existing accumulators. + * + * @param <A> DoubleAccumulator subclass + */ + interface DoubleAccumulatorFactory<A extends DoubleAccumulator> { + /** + * Creates a new accumulator with the given initial value. + * + * @param init initial value + * @return new accumulator with the given initial value + */ + A get(double init); + } + + /** + * Factory for {@link DoubleMaximum}. + */ + final class DoubleMaximumFactory implements DoubleAccumulatorFactory<DoubleMaximum> { + private static final DoubleMaximumFactory INSTANCE = new DoubleMaximumFactory(); + + private DoubleMaximumFactory(){ + } + + @Override + public DoubleMaximum get(double init) { + return new DoubleMaximum(init); + } + + public static DoubleMaximumFactory get() { + return INSTANCE; + } + } + + /** + * Factory for {@link DoubleMinimum}. + */ + final class DoubleMinimumFactory implements DoubleAccumulatorFactory<DoubleMinimum> { + private static final DoubleMinimumFactory INSTANCE = new DoubleMinimumFactory(); + + private DoubleMinimumFactory(){ + } + + @Override + public DoubleMinimum get(double init) { + return new DoubleMinimum(init); + } + + public static DoubleMinimumFactory get() { + return INSTANCE; + } + } + + /** + * Factory for {@link DoubleSum}. + */ + final class DoubleSumFactory implements DoubleAccumulatorFactory<DoubleSum> { + private static final DoubleSumFactory INSTANCE = new DoubleSumFactory(); + + private DoubleSumFactory(){ + } + + @Override + public DoubleSum get(double init) { + return new DoubleSum(init); + } + + public static DoubleSumFactory get() { + return INSTANCE; + } + } + + /** + * Factory for {@link DoubleAverage}. + */ + final class DoubleAverageFactory implements DoubleAccumulatorFactory<DoubleAverage> { + private static final DoubleAverageFactory INSTANCE = new DoubleAverageFactory(); + + private DoubleAverageFactory(){ + } + + @Override + public DoubleAverage get(double init) { + return new DoubleAverage(init); + } + + public static DoubleAverageFactory get() { + return INSTANCE; + } + } + + /** + * {@link DoubleAccumulator} that returns the maximum value. + */ + final class DoubleMaximum implements DoubleAccumulator { + + public static final String NAME = "max"; + + private double value; + + private DoubleMaximum(double init) { + value = init; + } + + @Override + public void add(double value) { + this.value = Math.max(this.value, value); + } + + @Override + public double getValue() { + return value; + } + + @Override + public String getName() { + return NAME; + } + } + + /** + * {@link DoubleAccumulator} that returns the minimum value. + */ + final class DoubleMinimum implements DoubleAccumulator { + + public static final String NAME = "min"; + + private double value; + + private DoubleMinimum(double init) { + value = init; + } + + @Override + public void add(double value) { + this.value = Math.min(this.value, value); + } + + @Override + public double getValue() { + return value; + } + + @Override + public String getName() { + return NAME; + } + } + + /** + * {@link DoubleAccumulator} that returns the sum of all values. + */ + final class DoubleSum implements DoubleAccumulator { + + public static final String NAME = "sum"; + + private double value; + + private DoubleSum(double init) { + value = init; + } + + @Override + public void add(double value) { + this.value += value; + } + + @Override + public double getValue() { + return value; + } + + @Override + public String getName() { + return NAME; + } + } + + /** + * {@link DoubleAccumulator} that returns the average over all values. + */ + final class DoubleAverage implements DoubleAccumulator { + + public static final String NAME = "avg"; + + private double sum; + private int count; + + private DoubleAverage(double init) { + sum = init; + count = 1; + } + + @Override + public void add(double value) { + this.sum += value; + this.count++; + } + + @Override + public double getValue() { + return sum / count; + } + + @Override + public String getName() { + return NAME; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java index b0e56de..a20368e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java @@ -27,7 +27,7 @@ import java.util.concurrent.Executor; * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned. * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } * - * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. * {@code /metrics?get=X,Y} * The handler will then return a list containing the values of the requested metrics. * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java index eaedea8..a5639d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java @@ -27,7 +27,7 @@ import java.util.concurrent.Executor; * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned. * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } * - * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. * {@code /metrics?get=X,Y} * The handler will then return a list containing the values of the requested metrics. * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java index 30fee25..2d67489 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java @@ -27,11 +27,14 @@ import java.util.concurrent.Executor; * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned. * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } * - * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. * {@code /metrics?get=X,Y} * The handler will then return a list containing the values of the requested metrics. * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + * + * @deprecated This class is subsumed by {@link SubtaskMetricsHandler} and is only kept for backwards-compatibility. */ +@Deprecated public class JobVertexMetricsHandler extends AbstractMetricsHandler { public static final String PARAMETER_VERTEX_ID = "vertexid"; private static final String JOB_VERTEX_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/metrics"; http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java index f9e79d3..473b9c2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.concurrent.ThreadSafe; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -125,12 +126,12 @@ public class MetricStore { * @param taskID task ID * @return ComponentMetricStore for given IDs, or null if no store for the given arguments exists */ - public synchronized ComponentMetricStore getTaskMetricStore(String jobID, String taskID) { + public synchronized TaskMetricStore getTaskMetricStore(String jobID, String taskID) { JobMetricStore job = jobID == null ? null : jobs.get(jobID); if (job == null || taskID == null) { return null; } - return ComponentMetricStore.unmodifiable(job.getTaskMetricStore(taskID)); + return TaskMetricStore.unmodifiable(job.getTaskMetricStore(taskID)); } /** @@ -218,11 +219,13 @@ public class MetricStore { QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info; job = jobs.computeIfAbsent(operatorInfo.jobID, k -> new JobMetricStore()); task = job.tasks.computeIfAbsent(operatorInfo.vertexID, k -> new TaskMetricStore()); + subtask = task.subtasks.computeIfAbsent(operatorInfo.subtaskIndex, k -> new ComponentMetricStore()); /** * As the WebInterface does not account for operators (because it can't) we don't * divide by operator and instead use the concatenation of subtask index, operator name and metric name * as the name. */ + addMetric(subtask.metrics, operatorInfo.operatorName + "." + name, metric); addMetric(task.metrics, operatorInfo.subtaskIndex + "." + operatorInfo.operatorName + "." + name, metric); break; default: @@ -348,11 +351,33 @@ public class MetricStore { * Sub-structure containing metrics of a single Task. */ @ThreadSafe - private static class TaskMetricStore extends ComponentMetricStore { - private final Map<Integer, ComponentMetricStore> subtasks = new ConcurrentHashMap<>(); + public static class TaskMetricStore extends ComponentMetricStore { + private final Map<Integer, ComponentMetricStore> subtasks; + + private TaskMetricStore() { + this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>()); + } + + private TaskMetricStore(Map<String, String> metrics, Map<Integer, ComponentMetricStore> subtasks) { + super(metrics); + this.subtasks = checkNotNull(subtasks); + } public ComponentMetricStore getSubtaskMetricStore(int subtaskIndex) { return subtasks.get(subtaskIndex); } + + public Collection<ComponentMetricStore> getAllSubtaskMetricStores() { + return subtasks.values(); + } + + private static TaskMetricStore unmodifiable(TaskMetricStore source) { + if (source == null) { + return null; + } + return new TaskMetricStore( + unmodifiableMap(source.metrics), + unmodifiableMap(source.subtasks)); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandler.java new file mode 100644 index 0000000..bc238e9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandler.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +import org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler; + +import java.util.Map; +import java.util.concurrent.Executor; + +/** + * Request handler that returns, aggregated across all subtasks of a single tasks, a list of all available metrics or the + * values for a set of metrics. + * + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } + * + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. + * {@code /metrics?get=X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + * + * <p>The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are + * "sum", "max", "min" and "avg". + */ +public class SubtaskMetricsHandler extends AbstractMetricsHandler { + private static final String SUBTASK_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/metrics"; + + public SubtaskMetricsHandler(Executor executor, MetricFetcher fetcher) { + super(executor, fetcher); + } + + @Override + public String[] getPaths() { + return new String[]{SUBTASK_METRICS_REST_PATH}; + } + + @Override + protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { + String subtaskNumString = pathParams.get(SubtaskExecutionAttemptDetailsHandler.PARAMETER_SUBTASK_INDEX); + int subtaskNum; + try { + subtaskNum = Integer.valueOf(subtaskNumString); + } catch (NumberFormatException nfe) { + return null; + } + MetricStore.ComponentMetricStore subtask = metrics.getSubtaskMetricStore( + pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID), + pathParams.get(JobVertexMetricsHandler.PARAMETER_VERTEX_ID), + subtaskNum); + return subtask != null + ? subtask.metrics + : null; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java index 66702c9..1b7e98c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java @@ -29,7 +29,7 @@ import java.util.concurrent.Executor; * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned. * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } * - * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. * {@code /metrics?get=X,Y} * The handler will then return a list containing the values of the requested metrics. * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandlerTest.java new file mode 100644 index 0000000..a7f526f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandlerTest.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.powermock.api.mockito.PowerMockito.mock; + +/** + * Tests for the {@link AbstractAggregatingMetricsHandler}. + */ +public class AbstractAggregatingMetricsHandlerTest { + private static final ObjectMapper mapper = new ObjectMapper(); + + private static AggregatingTaskManagersMetricsHandler handler; + + @BeforeClass + public static void setupHandler() { + MetricFetcher fetcher = new MetricFetcher( + mock(GatewayRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); + MetricStoreTest.setupStore(fetcher.getMetricStore()); + + handler = new AggregatingTaskManagersMetricsHandler(Executors.directExecutor(), fetcher); + } + + @Test + public void testListMetrics() throws Exception { + String response = handler.handleJsonRequest(Collections.emptyMap(), Collections.emptyMap(), null) + .get(); + + Assert.assertEquals("[{\"id\":\"abc.metric22\"},{\"id\":\"abc.metric2b\"},{\"id\":\"abc.metric2\"}]", response); + } + + @Test + public void testMinAggregation() throws Exception { + Map<String, String> queryParams = new HashMap<>(); + queryParams.put("get", "abc.metric2"); + queryParams.put("agg", "min"); + String response = handler.handleJsonRequest(Collections.emptyMap(), queryParams, null) + .get(); + + JsonNode responseObject = mapper.readTree(response); + Assert.assertEquals(1, responseObject.size()); + JsonNode metricInfo = responseObject.get(0); + Assert.assertEquals("abc.metric2", metricInfo.get("id").asText()); + Assert.assertEquals(1, metricInfo.get("min").asDouble(), 0.01); + } + + @Test + public void testMaxAggregation() throws Exception { + Map<String, String> queryParams = new HashMap<>(); + queryParams.put("get", "abc.metric2"); + queryParams.put("agg", "max"); + String response = handler.handleJsonRequest(Collections.emptyMap(), queryParams, null) + .get(); + + JsonNode responseObject = mapper.readTree(response); + Assert.assertEquals(1, responseObject.size()); + JsonNode metricInfo = responseObject.get(0); + Assert.assertEquals("abc.metric2", metricInfo.get("id").asText()); + Assert.assertEquals(10, metricInfo.get("max").asDouble(), 0.01); + } + + @Test + public void testAvgAggregation() throws Exception { + Map<String, String> queryParams = new HashMap<>(); + queryParams.put("get", "abc.metric2"); + queryParams.put("agg", "avg"); + String response = handler.handleJsonRequest(Collections.emptyMap(), queryParams, null) + .get(); + + JsonNode responseObject = mapper.readTree(response); + Assert.assertEquals(1, responseObject.size()); + JsonNode metricInfo = responseObject.get(0); + Assert.assertEquals("abc.metric2", metricInfo.get("id").asText()); + Assert.assertEquals(5.5, metricInfo.get("avg").asDouble(), 0.01); + } + + @Test + public void testSumAggregation() throws Exception { + Map<String, String> queryParams = new HashMap<>(); + queryParams.put("get", "abc.metric2"); + queryParams.put("agg", "sum"); + String response = handler.handleJsonRequest(Collections.emptyMap(), queryParams, null) + .get(); + + JsonNode responseObject = mapper.readTree(response); + Assert.assertEquals(1, responseObject.size()); + JsonNode metricInfo = responseObject.get(0); + Assert.assertEquals("abc.metric2", metricInfo.get("id").asText()); + Assert.assertEquals(11, metricInfo.get("sum").asDouble(), 0.01); + } + + @Test + public void testMultipleAggregations() throws Exception { + Map<String, String> queryParams = new HashMap<>(); + queryParams.put("get", "abc.metric2"); + queryParams.put("agg", "sum,max,min,avg"); + String response = handler.handleJsonRequest(Collections.emptyMap(), queryParams, null) + .get(); + + JsonNode array = mapper.readTree(response); + Assert.assertEquals(1, array.size()); + + FullMetricInfo mappedResponse = mapper.treeToValue(array.get(0), FullMetricInfo.class); + Assert.assertEquals("abc.metric2", mappedResponse.id); + Assert.assertEquals(11, mappedResponse.sum, 0.01); + Assert.assertEquals(1, mappedResponse.min, 0.01); + Assert.assertEquals(10, mappedResponse.max, 0.01); + Assert.assertEquals(5.5, mappedResponse.avg, 0.01); + } + + @Test + public void testDefaultAggregations() throws Exception { + Map<String, String> queryParams = new HashMap<>(); + queryParams.put("get", "abc.metric2"); + String response = handler.handleJsonRequest(Collections.emptyMap(), queryParams, null) + .get(); + + JsonNode array = mapper.readTree(response); + Assert.assertEquals(1, array.size()); + + FullMetricInfo mappedResponse = mapper.treeToValue(array.get(0), FullMetricInfo.class); + Assert.assertEquals("abc.metric2", mappedResponse.id); + Assert.assertEquals(11, mappedResponse.sum, 0.01); + Assert.assertEquals(1, mappedResponse.min, 0.01); + Assert.assertEquals(10, mappedResponse.max, 0.01); + Assert.assertEquals(5.5, mappedResponse.avg, 0.01); + } + + private static class FullMetricInfo { + private final String id; + private final double min; + private final double max; + private final double avg; + private final double sum; + + @JsonCreator + private FullMetricInfo( + @JsonProperty("id") String id, + @JsonProperty("min") double min, + @JsonProperty("max") double max, + @JsonProperty("avg") double avg, + @JsonProperty("sum") double sum) { + this.id = id; + this.min = min; + this.max = max; + this.avg = avg; + this.sum = sum; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandlerTest.java index 8c38e79..19dd3e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandlerTest.java @@ -63,6 +63,8 @@ public class AbstractMetricsHandlerTest extends TestLogger { assertEquals("[" + "{\"id\":\"8.opname.abc.metric6\"}," + "{\"id\":\"8.opname.abc.metric7\"}," + + "{\"id\":\"1.opname.abc.metric6\"}," + + "{\"id\":\"1.opname.abc.metric7\"}," + "{\"id\":\"8.abc.metric5\"}" + "]", availableList); http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingJobsMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingJobsMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingJobsMetricsHandlerTest.java new file mode 100644 index 0000000..5493750 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingJobsMetricsHandlerTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.powermock.api.mockito.PowerMockito.mock; + +/** + * Tests for the {@link AggregatingJobsMetricsHandler}. + */ +public class AggregatingJobsMetricsHandlerTest extends TestLogger { + @Test + public void testGetPaths() { + AggregatingJobsMetricsHandler handler = new AggregatingJobsMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class)); + String[] paths = handler.getPaths(); + assertEquals(1, paths.length); + assertEquals("/jobs/metrics", paths[0]); + } + + @Test + public void getStores() throws Exception { + MetricFetcher fetcher = new MetricFetcher( + mock(GatewayRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); + MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore()); + + AggregatingJobsMetricsHandler handler = new AggregatingJobsMetricsHandler(Executors.directExecutor(), fetcher); + + Map<String, String> pathParams = new HashMap<>(); + Map<String, String> queryParams = new HashMap<>(); + + Collection<? extends MetricStore.ComponentMetricStore> stores = handler.getStores(store, pathParams, queryParams); + + assertEquals(2, stores.size()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandlerTest.java new file mode 100644 index 0000000..fc04b63 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandlerTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler.PARAMETER_JOB_ID; +import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID; +import static org.junit.Assert.assertEquals; +import static org.powermock.api.mockito.PowerMockito.mock; + +/** + * Tests for the {@link AggregatingSubtasksMetricsHandler}. + */ +public class AggregatingSubtasksMetricsHandlerTest extends TestLogger { + @Test + public void testGetPaths() { + AggregatingSubtasksMetricsHandler handler = new AggregatingSubtasksMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class)); + String[] paths = handler.getPaths(); + assertEquals(1, paths.length); + assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/metrics", paths[0]); + } + + @Test + public void getStores() throws Exception { + MetricFetcher fetcher = new MetricFetcher( + mock(GatewayRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); + MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore()); + + AggregatingSubtasksMetricsHandler handler = new AggregatingSubtasksMetricsHandler(Executors.directExecutor(), fetcher); + + Map<String, String> pathParams = new HashMap<>(); + pathParams.put(PARAMETER_JOB_ID, "jobid"); + pathParams.put(PARAMETER_VERTEX_ID, "taskid"); + Map<String, String> queryParams = new HashMap<>(); + + Collection<? extends MetricStore.ComponentMetricStore> stores = handler.getStores(store, pathParams, queryParams); + + assertEquals(2, stores.size()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingTaskManagersMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingTaskManagersMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingTaskManagersMetricsHandlerTest.java new file mode 100644 index 0000000..bb514af --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingTaskManagersMetricsHandlerTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.powermock.api.mockito.PowerMockito.mock; + +/** + * Tests for the {@link AggregatingTaskManagersMetricsHandler}. + */ +public class AggregatingTaskManagersMetricsHandlerTest extends TestLogger { + @Test + public void testGetPaths() { + AggregatingTaskManagersMetricsHandler handler = new AggregatingTaskManagersMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class)); + String[] paths = handler.getPaths(); + assertEquals(1, paths.length); + assertEquals("/taskmanagers/metrics", paths[0]); + } + + @Test + public void getStores() throws Exception { + MetricFetcher fetcher = new MetricFetcher( + mock(GatewayRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); + MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore()); + + AggregatingTaskManagersMetricsHandler handler = new AggregatingTaskManagersMetricsHandler(Executors.directExecutor(), fetcher); + + Map<String, String> pathParams = new HashMap<>(); + Map<String, String> queryParams = new HashMap<>(); + + Collection<? extends MetricStore.ComponentMetricStore> stores = handler.getStores(store, pathParams, queryParams); + + assertEquals(2, stores.size()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java index 82c6894..1d038ab 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java @@ -69,11 +69,20 @@ public class MetricStoreTest extends TestLogger { QueryScopeInfo.TaskManagerQueryScopeInfo tm = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "abc"); MetricDump.CounterDump cd2 = new MetricDump.CounterDump(tm, "metric2", 1); + MetricDump.CounterDump cd2a = new MetricDump.CounterDump(tm, "metric22", 1); + + QueryScopeInfo.TaskManagerQueryScopeInfo tm2 = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid2", "abc"); + MetricDump.CounterDump cd22 = new MetricDump.CounterDump(tm2, "metric2", 10); + MetricDump.CounterDump cd22a = new MetricDump.CounterDump(tm2, "metric2b", 10); QueryScopeInfo.JobQueryScopeInfo job = new QueryScopeInfo.JobQueryScopeInfo("jobid", "abc"); MetricDump.CounterDump cd3 = new MetricDump.CounterDump(job, "metric3", 2); MetricDump.CounterDump cd4 = new MetricDump.CounterDump(job, "metric4", 3); + QueryScopeInfo.JobQueryScopeInfo job2 = new QueryScopeInfo.JobQueryScopeInfo("jobid2", "abc"); + MetricDump.CounterDump cd32 = new MetricDump.CounterDump(job2, "metric3", 2); + MetricDump.CounterDump cd42 = new MetricDump.CounterDump(job2, "metric4", 3); + QueryScopeInfo.TaskQueryScopeInfo task = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, "abc"); MetricDump.CounterDump cd5 = new MetricDump.CounterDump(task, "metric5", 4); @@ -81,14 +90,26 @@ public class MetricStoreTest extends TestLogger { MetricDump.CounterDump cd6 = new MetricDump.CounterDump(operator, "metric6", 5); MetricDump.CounterDump cd7 = new MetricDump.CounterDump(operator, "metric7", 6); + QueryScopeInfo.OperatorQueryScopeInfo operator2 = new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 1, "opname", "abc"); + MetricDump.CounterDump cd62 = new MetricDump.CounterDump(operator2, "metric6", 5); + MetricDump.CounterDump cd72 = new MetricDump.CounterDump(operator2, "metric7", 6); + store.add(cd1); store.add(cd2); + store.add(cd2a); store.add(cd3); store.add(cd4); store.add(cd5); store.add(cd6); store.add(cd7); + store.add(cd62); + store.add(cd72); + store.add(cd22); + store.add(cd22a); + store.add(cd32); + store.add(cd42); + return store; } } http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandlerTest.java new file mode 100644 index 0000000..152060d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandlerTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler.PARAMETER_SUBTASK_INDEX; +import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler.PARAMETER_JOB_ID; +import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.powermock.api.mockito.PowerMockito.mock; + +/** + * Tests for the {@link SubtaskMetricsHandler}. + */ +public class SubtaskMetricsHandlerTest extends TestLogger { + @Test + public void testGetPaths() { + SubtaskMetricsHandler handler = new SubtaskMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class)); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/metrics", paths[0]); + } + + @Test + public void getMapFor() throws Exception { + MetricFetcher fetcher = new MetricFetcher( + mock(GatewayRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); + MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore()); + + SubtaskMetricsHandler handler = new SubtaskMetricsHandler(Executors.directExecutor(), fetcher); + + Map<String, String> pathParams = new HashMap<>(); + pathParams.put(PARAMETER_JOB_ID, "jobid"); + pathParams.put(PARAMETER_VERTEX_ID, "taskid"); + pathParams.put(PARAMETER_SUBTASK_INDEX, "8"); + + Map<String, String> metrics = handler.getMapFor(pathParams, store); + + assertEquals("4", metrics.get("abc.metric5")); + assertEquals("5", metrics.get("opname.abc.metric6")); + assertEquals("6", metrics.get("opname.abc.metric7")); + } + + @Test + public void getMapForNull() { + MetricFetcher fetcher = new MetricFetcher( + mock(GatewayRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); + MetricStore store = fetcher.getMetricStore(); + + SubtaskMetricsHandler handler = new SubtaskMetricsHandler(Executors.directExecutor(), fetcher); + + Map<String, String> pathParams = new HashMap<>(); + + Map<String, String> metrics = handler.getMapFor(pathParams, store); + + assertNull(metrics); + } +}
