[FLINK-7781][metrics][REST] Support on-demand aggregations

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d48b208a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d48b208a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d48b208a

Branch: refs/heads/master
Commit: d48b208a7207bc50ba76019b7cb71039573bd26e
Parents: 4207eaa
Author: zentol <[email protected]>
Authored: Wed Oct 25 11:21:28 2017 +0200
Committer: zentol <[email protected]>
Committed: Tue Oct 31 11:53:44 2017 +0100

----------------------------------------------------------------------
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  23 +-
 .../SubtaskExecutionAttemptDetailsHandler.java  |   1 +
 .../AbstractAggregatingMetricsHandler.java      | 224 ++++++++++++++++
 .../legacy/metrics/AbstractMetricsHandler.java  |   9 +-
 .../metrics/AggregatingJobsMetricsHandler.java  |  57 ++++
 .../AggregatingSubtasksMetricsHandler.java      | 122 +++++++++
 .../AggregatingTaskManagersMetricsHandler.java  |  57 ++++
 .../legacy/metrics/DoubleAccumulator.java       | 257 +++++++++++++++++++
 .../metrics/JobManagerMetricsHandler.java       |   2 +-
 .../legacy/metrics/JobMetricsHandler.java       |   2 +-
 .../legacy/metrics/JobVertexMetricsHandler.java |   5 +-
 .../handler/legacy/metrics/MetricStore.java     |  33 ++-
 .../legacy/metrics/SubtaskMetricsHandler.java   |  70 +++++
 .../metrics/TaskManagerMetricsHandler.java      |   2 +-
 .../AbstractAggregatingMetricsHandlerTest.java  | 186 ++++++++++++++
 .../metrics/AbstractMetricsHandlerTest.java     |   2 +
 .../AggregatingJobsMetricsHandlerTest.java      |  66 +++++
 .../AggregatingSubtasksMetricsHandlerTest.java  |  70 +++++
 ...gregatingTaskManagersMetricsHandlerTest.java |  66 +++++
 .../handler/legacy/metrics/MetricStoreTest.java |  21 ++
 .../metrics/SubtaskMetricsHandlerTest.java      |  92 +++++++
 21 files changed, 1351 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 1a6178f..fe5f106 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -63,10 +63,14 @@ import 
org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsD
 import 
org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsSubtasksHandler;
 import 
org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsHandler;
 import 
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.metrics.AggregatingJobsMetricsHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.metrics.AggregatingSubtasksMetricsHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.metrics.AggregatingTaskManagersMetricsHandler;
 import 
org.apache.flink.runtime.rest.handler.legacy.metrics.JobManagerMetricsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler;
 import 
org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import 
org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler;
 import 
org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarAccessDeniedHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler;
@@ -261,6 +265,20 @@ public class WebRuntimeMonitor implements WebMonitor {
                // job manager configuration
                get(router, new ClusterConfigHandler(scheduledExecutor, 
config));
 
+               // metrics
+               get(router, new JobManagerMetricsHandler(scheduledExecutor, 
metricFetcher));
+
+               get(router, new 
AggregatingTaskManagersMetricsHandler(scheduledExecutor, metricFetcher));
+               get(router, new TaskManagerMetricsHandler(scheduledExecutor, 
metricFetcher));
+
+               get(router, new 
AggregatingJobsMetricsHandler(scheduledExecutor, metricFetcher));
+               get(router, new JobMetricsHandler(scheduledExecutor, 
metricFetcher));
+
+               get(router, new JobVertexMetricsHandler(scheduledExecutor, 
metricFetcher));
+
+               get(router, new 
AggregatingSubtasksMetricsHandler(scheduledExecutor, metricFetcher));
+               get(router, new SubtaskMetricsHandler(scheduledExecutor, 
metricFetcher));
+
                // overview over jobs
                get(router, new CurrentJobsOverviewHandler(scheduledExecutor, 
DEFAULT_REQUEST_TIMEOUT, true, true));
                get(router, new CurrentJobsOverviewHandler(scheduledExecutor, 
DEFAULT_REQUEST_TIMEOUT, true, false));
@@ -275,7 +293,6 @@ public class WebRuntimeMonitor implements WebMonitor {
                get(router, new 
JobVertexTaskManagersHandler(executionGraphCache, scheduledExecutor, 
metricFetcher));
                get(router, new 
JobVertexAccumulatorsHandler(executionGraphCache, scheduledExecutor));
                get(router, new 
JobVertexBackPressureHandler(executionGraphCache, scheduledExecutor, 
backPressureStatsTracker, refreshInterval));
-               get(router, new JobVertexMetricsHandler(scheduledExecutor, 
metricFetcher));
                get(router, new 
SubtasksAllAccumulatorsHandler(executionGraphCache, scheduledExecutor));
                get(router, new 
SubtaskCurrentAttemptDetailsHandler(executionGraphCache, scheduledExecutor, 
metricFetcher));
                get(router, new 
SubtaskExecutionAttemptDetailsHandler(executionGraphCache, scheduledExecutor, 
metricFetcher));
@@ -285,7 +302,6 @@ public class WebRuntimeMonitor implements WebMonitor {
                get(router, new JobConfigHandler(executionGraphCache, 
scheduledExecutor));
                get(router, new JobExceptionsHandler(executionGraphCache, 
scheduledExecutor));
                get(router, new JobAccumulatorsHandler(executionGraphCache, 
scheduledExecutor));
-               get(router, new JobMetricsHandler(scheduledExecutor, 
metricFetcher));
 
                get(router, new TaskManagersHandler(scheduledExecutor, 
DEFAULT_REQUEST_TIMEOUT, metricFetcher));
                get(router,
@@ -304,7 +320,6 @@ public class WebRuntimeMonitor implements WebMonitor {
                                timeout,
                                TaskManagerLogHandler.FileMode.STDOUT,
                                config));
-               get(router, new TaskManagerMetricsHandler(scheduledExecutor, 
metricFetcher));
 
                router
                        // log and stdout
@@ -318,8 +333,6 @@ public class WebRuntimeMonitor implements WebMonitor {
                        .GET("/jobmanager/stdout", logFiles.stdOutFile == null 
? new ConstantTextHandler("(stdout file unavailable)") :
                                new StaticFileServerHandler<>(retriever, 
localRestAddress, timeout, logFiles.stdOutFile));
 
-               get(router, new JobManagerMetricsHandler(scheduledExecutor, 
metricFetcher));
-
                // Cancel a job via GET (for proper integration with YARN this 
has to be performed via GET)
                get(router, new JobCancellationHandler(scheduledExecutor, 
timeout));
                // DELETE is the preferred way of canceling a job (Rest-conform)

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
index 5aa8312..7ae7621 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
@@ -51,6 +51,7 @@ import static 
org.apache.flink.runtime.rest.handler.legacy.SubtaskCurrentAttempt
  */
 public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemptRequestHandler {
 
+       public static final String PARAMETER_SUBTASK_INDEX = "subtasknum";
        private static final String SUBTASK_ATTEMPT_DETAILS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt";
 
        private final MetricFetcher fetcher;

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java
new file mode 100644
index 0000000..9386a56
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Abstract request handler for querying aggregated metrics. Subclasses return 
either a list of all available metrics
+ * or the aggregated values of them across all/selected entities.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated 
list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested 
metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * <p>The "agg" query parameter is used to define which aggregates should be 
calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified, all 
aggregations will be returned.
+ * {@code /metrics?get=X,Y&agg=min,max}
+ * The handler will then return a list of objects containing the aggregations 
for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", 
"max", "10"}]}
+ */
+abstract class AbstractAggregatingMetricsHandler extends 
AbstractJsonRequestHandler {
+
+       protected final Logger log = LoggerFactory.getLogger(getClass());
+
+       private static final String PARAMETER_AGGREGATION = "agg";
+
+       private final MetricFetcher fetcher;
+
+       AbstractAggregatingMetricsHandler(Executor executor, MetricFetcher 
fetcher) {
+               super(executor);
+               this.fetcher = Preconditions.checkNotNull(fetcher);
+       }
+
+       protected abstract Collection<? extends 
MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, 
String> pathParameters, Map<String, String> queryParameters);
+
+       @Override
+       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       fetcher.update();
+                                       String requestedMetricsList = 
queryParams.get(AbstractMetricsHandler.PARAMETER_METRICS);
+                                       String aggTypeList = 
queryParams.get(PARAMETER_AGGREGATION);
+                                       MetricStore store = 
fetcher.getMetricStore();
+
+                                       Collection<? extends 
MetricStore.ComponentMetricStore> stores = getStores(store, pathParams, 
queryParams);
+                                       if (stores == null){
+                                               return "[]";
+                                       }
+
+                                       if (requestedMetricsList == null) {
+                                               Collection<String> list = 
getAvailableMetrics(stores);
+                                               return 
mapMetricListToJson(list);
+                                       }
+
+                                       if (requestedMetricsList.isEmpty()) {
+                                               /*
+                                                * The WebInterface doesn't 
check whether the list of available metrics was empty. This can lead to a
+                                                * request for which the "get" 
parameter is an empty string.
+                                                */
+                                               return "[]";
+                                       }
+
+                                       String[] requestedMetrics = 
requestedMetricsList.split(",");
+
+                                       
List<DoubleAccumulator.DoubleAccumulatorFactory<?>> 
requestedAggregationsFactories = new ArrayList<>();
+                                       // by default we return all aggregations
+                                       if (aggTypeList == null || 
aggTypeList.isEmpty()) {
+                                               
requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get());
+                                               
requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get());
+                                               
requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get());
+                                               
requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get());
+                                       } else {
+                                               for (String aggregation : 
aggTypeList.split(",")) {
+                                                       switch 
(aggregation.toLowerCase()) {
+                                                               case 
DoubleAccumulator.DoubleMinimum.NAME:
+                                                                       
requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get());
+                                                                       break;
+                                                               case 
DoubleAccumulator.DoubleMaximum.NAME:
+                                                                       
requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get());
+                                                                       break;
+                                                               case 
DoubleAccumulator.DoubleSum.NAME:
+                                                                       
requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get());
+                                                                       break;
+                                                               case 
DoubleAccumulator.DoubleAverage.NAME:
+                                                                       
requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get());
+                                                                       break;
+                                                               default:
+                                                                       
log.warn("Invalid aggregation specified: {}", aggregation.toLowerCase());
+                                                       }
+                                               }
+                                       }
+
+                                       return 
getAggregatedMetricValues(stores, requestedMetrics, 
requestedAggregationsFactories);
+                               } catch (Exception e) {
+                                       throw new CompletionException(new 
FlinkException("Could not retrieve metrics.", e));
+                               }
+                       },
+                       executor);
+       }
+
+       /**
+        * Returns a JSON string containing a list of all available metrics in 
the given stores. Effectively this method maps
+        * the union of all key-sets to JSON.
+        *
+        * @param stores metrics
+        * @return JSON string containing a list of all available metrics
+        */
+       private static Collection<String> getAvailableMetrics(Collection<? 
extends MetricStore.ComponentMetricStore> stores) {
+               Set<String> uniqueMetrics = new HashSet<>();
+               for (MetricStore.ComponentMetricStore store : stores) {
+                       uniqueMetrics.addAll(store.metrics.keySet());
+               }
+               return uniqueMetrics;
+       }
+
+       private static String mapMetricListToJson(Collection<String> metrics) 
throws IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               gen.writeStartArray();
+               for (String m : metrics) {
+                       gen.writeStartObject();
+                       gen.writeStringField("id", m);
+                       gen.writeEndObject();
+               }
+               gen.writeEndArray();
+
+               gen.close();
+               return writer.toString();
+       }
+
+       /**
+        * Extracts and aggregates all requested metrics from the given metric 
stores, and maps the result to a JSON string.
+        *
+        * @param stores available metrics
+        * @param requestedMetrics ids of requested metrics
+        * @param requestedAggregationsFactories requested aggregations
+        * @return JSON string containing the requested metrics
+        * @throws IOException
+        */
+       private String getAggregatedMetricValues(
+                       Collection<? extends MetricStore.ComponentMetricStore> 
stores,
+                       String[] requestedMetrics,
+                       List<DoubleAccumulator.DoubleAccumulatorFactory<?>> 
requestedAggregationsFactories) throws IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               gen.writeStartArray();
+               for (String requestedMetric : requestedMetrics) {
+                       final Collection<Double> values = new ArrayList<>();
+                       try {
+                               for (MetricStore.ComponentMetricStore store : 
stores) {
+                                       String stringValue = 
store.metrics.get(requestedMetric);
+                                       if (stringValue != null) {
+                                               
values.add(Double.valueOf(stringValue));
+                                       }
+                               }
+                       } catch (NumberFormatException nfe) {
+                               log.warn("The metric {} is not numeric and 
can't be aggregated.", requestedMetric, nfe);
+                               // metric is not numeric so we can't perform 
aggregations => ignore it
+                               continue;
+                       }
+                       if (!values.isEmpty()) {
+
+                               gen.writeStartObject();
+                               gen.writeStringField("id", requestedMetric);
+                               for 
(DoubleAccumulator.DoubleAccumulatorFactory<?> accFactory : 
requestedAggregationsFactories) {
+                                       Iterator<Double> valuesIterator = 
values.iterator();
+                                       DoubleAccumulator acc = 
accFactory.get(valuesIterator.next());
+                                       
valuesIterator.forEachRemaining(acc::add);
+
+                                       gen.writeStringField(acc.getName(), 
String.valueOf(acc.getValue()));
+                               }
+                               gen.writeEndObject();
+                       }
+               }
+               gen.writeEndArray();
+
+               gen.close();
+               return writer.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
index 186397b..87d47c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
@@ -39,12 +39,15 @@ import java.util.concurrent.Executor;
  * <p>If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
  * {@code [ { "id" : "X" } ] }
  *
- * <p>If the query parameters do contain a "get" parameter a comma-separate 
list of metric names is expected as a value.
- * {@code /get?X,Y}
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated 
list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
  * The handler will then return a list containing the values of the requested 
metrics.
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
  */
 public abstract class AbstractMetricsHandler extends 
AbstractJsonRequestHandler {
+
+       public static final String PARAMETER_METRICS = "get";
+
        private final MetricFetcher fetcher;
 
        public AbstractMetricsHandler(Executor executor, MetricFetcher fetcher) 
{
@@ -57,7 +60,7 @@ public abstract class AbstractMetricsHandler extends 
AbstractJsonRequestHandler
                return CompletableFuture.supplyAsync(
                        () -> {
                                fetcher.update();
-                               String requestedMetricsList = 
queryParams.get("get");
+                               String requestedMetricsList = 
queryParams.get(PARAMETER_METRICS);
                                try {
                                        return requestedMetricsList != null
                                                ? getMetricsValues(pathParams, 
requestedMetricsList)

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingJobsMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingJobsMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingJobsMetricsHandler.java
new file mode 100644
index 0000000..3fe921e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingJobsMetricsHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across all jobs, a list of all 
available metrics or the values
+ * for a set of metrics.
+ *
+ * <p>Specific jobs can be selected for aggregation by specifying a 
comma-separated list of job IDs.
+ * {@code /metrics?get=X,Y&jobs=A,B}
+ */
+public class AggregatingJobsMetricsHandler extends 
AbstractAggregatingMetricsHandler {
+       public AggregatingJobsMetricsHandler(Executor executor, MetricFetcher 
fetcher) {
+               super(executor, fetcher);
+       }
+
+       @Override
+       protected Collection<? extends MetricStore.ComponentMetricStore> 
getStores(MetricStore store, Map<String, String> pathParameters, Map<String, 
String> queryParameters) {
+               String jobsList = queryParameters.get("jobs");
+               if (jobsList == null || jobsList.isEmpty()) {
+                       return store.getJobs().values();
+               } else {
+                       String[] jobs = jobsList.split(",");
+                       Collection<MetricStore.ComponentMetricStore> jobStores 
= new ArrayList<>();
+                       for (String job : jobs) {
+                               jobStores.add(store.getJobMetricStore(job));
+                       }
+                       return jobStores;
+               }
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{"/jobs/metrics"};
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandler.java
new file mode 100644
index 0000000..e51b2ad
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandler.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
+import static 
org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID;
+
+/**
+ * Request handler that returns, aggregated across all subtasks, a list of all 
available metrics or the values
+ * for a set of metrics.
+ *
+ * <p>Specific subtasks can be selected for aggregation by specifying a 
comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y&subtasks=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends 
AbstractAggregatingMetricsHandler {
+       public AggregatingSubtasksMetricsHandler(Executor executor, 
MetricFetcher fetcher) {
+               super(executor, fetcher);
+       }
+
+       @Override
+       protected Collection<? extends MetricStore.ComponentMetricStore> 
getStores(MetricStore store, Map<String, String> pathParameters, Map<String, 
String> queryParameters) {
+               String jobID = pathParameters.get(PARAMETER_JOB_ID);
+               String taskID = pathParameters.get(PARAMETER_VERTEX_ID);
+               if (jobID == null) {
+                       return Collections.emptyList();
+               }
+               if (taskID == null) {
+                       return Collections.emptyList();
+               }
+               String subtasksList = queryParameters.get("subtasks");
+               if (subtasksList == null || subtasksList.isEmpty()) {
+                       return store.getTaskMetricStore(jobID, 
taskID).getAllSubtaskMetricStores();
+               } else {
+                       Iterable<Integer> subtasks = 
getIntegerRangeFromString(subtasksList);
+                       Collection<MetricStore.ComponentMetricStore> 
subtaskStores = new ArrayList<>();
+                       for (int subtask : subtasks) {
+                               
subtaskStores.add(store.getSubtaskMetricStore(jobID, taskID, subtask));
+                       }
+                       return subtaskStores;
+               }
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new 
String[]{"/jobs/:jobid/vertices/:vertexid/subtasks/metrics"};
+       }
+
+       private Iterable<Integer> getIntegerRangeFromString(String 
rangeDefinition) {
+               final String[] ranges = rangeDefinition.trim().split(",");
+
+               UnionIterator<Integer> iterators = new UnionIterator<>();
+
+               for (String rawRange : ranges) {
+                       try {
+                               Iterator<Integer> rangeIterator;
+                               String range = rawRange.trim();
+                               int dashIdx = range.indexOf('-');
+                               if (dashIdx == -1) {
+                                       // only one value in range:
+                                       rangeIterator = 
Collections.singleton(Integer.valueOf(range)).iterator();
+                               } else {
+                                       // evaluate range
+                                       final int start = 
Integer.valueOf(range.substring(0, dashIdx));
+                                       final int end = 
Integer.valueOf(range.substring(dashIdx + 1, range.length()));
+                                       rangeIterator = new Iterator<Integer>() 
{
+                                               int i = start;
+
+                                               @Override
+                                               public boolean hasNext() {
+                                                       return i <= end;
+                                               }
+
+                                               @Override
+                                               public Integer next() {
+                                                       if (hasNext()) {
+                                                               return i++;
+                                                       } else {
+                                                               throw new 
NoSuchElementException();
+                                                       }
+                                               }
+
+                                               @Override
+                                               public void remove() {
+                                                       throw new 
UnsupportedOperationException("Remove not supported");
+                                               }
+                                       };
+                               }
+                               iterators.add(rangeIterator);
+                       } catch (NumberFormatException nfe) {
+                               log.warn("Invalid value {} specified for 
integer range. Not a number.", rawRange, nfe);
+                       }
+               }
+
+               return iterators;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingTaskManagersMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingTaskManagersMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingTaskManagersMetricsHandler.java
new file mode 100644
index 0000000..b7daa77
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingTaskManagersMetricsHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across all task managers, a list 
of all available metrics or the values for
+ * a set of metrics.
+ *
+ * <p>Specific taskmanagers can be selected for aggregation by specifying a 
comma-separated list of taskmanager IDs.
+ * {@code /metrics?get=X,Y&taskmanagers=A,B}
+ */
+public class AggregatingTaskManagersMetricsHandler extends 
AbstractAggregatingMetricsHandler {
+       public AggregatingTaskManagersMetricsHandler(Executor executor, 
MetricFetcher fetcher) {
+               super(executor, fetcher);
+       }
+
+       @Override
+       protected Collection<? extends MetricStore.ComponentMetricStore> 
getStores(MetricStore store, Map<String, String> pathParameters, Map<String, 
String> queryParameters) {
+               String taskmanagersList = queryParameters.get("taskmanagers");
+               if (taskmanagersList == null || taskmanagersList.isEmpty()) {
+                       return store.getTaskManagers().values();
+               } else {
+                       String[] taskmanagers = taskmanagersList.split(",");
+                       Collection<MetricStore.TaskManagerMetricStore> 
taskmanagerStores = new ArrayList<>();
+                       for (String taskmanager : taskmanagers) {
+                               
taskmanagerStores.add(store.getTaskManagerMetricStore(taskmanager));
+                       }
+                       return taskmanagerStores;
+               }
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{"/taskmanagers/metrics"};
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java
new file mode 100644
index 0000000..2c5e593
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+/**
+ * An interface for accumulating double values.
+ */
+interface DoubleAccumulator {
+
+       /**
+        * Adds the given value to this accumulator.
+        *
+        * @param value value to add
+        */
+       void add(double value);
+
+       /**
+        * Returns the current value of this accumulator.
+        *
+        * @return current value of this accumulator
+        */
+       double getValue();
+
+       /**
+        * Returns the name of this accumulator type. This name is used as a 
suffix for exposed metrics.
+        *
+        * @return name of this accumulator type
+        */
+       String getName();
+
+       /**
+        * A factory for {@link DoubleAccumulator}s. This allows us to 
regenerate a new set of accumulators for each metrics
+        * without re-evaluating the "agg" query parameter or re-using existing 
accumulators.
+        *
+        * @param <A> DoubleAccumulator subclass
+        */
+       interface DoubleAccumulatorFactory<A extends DoubleAccumulator> {
+               /**
+                * Creates a new accumulator with the given initial value.
+                *
+                * @param init initial value
+                * @return new accumulator with the given initial value
+                */
+               A get(double init);
+       }
+
+       /**
+        * Factory for {@link DoubleMaximum}.
+        */
+       final class DoubleMaximumFactory implements 
DoubleAccumulatorFactory<DoubleMaximum> {
+               private static final DoubleMaximumFactory INSTANCE = new 
DoubleMaximumFactory();
+
+               private DoubleMaximumFactory(){
+               }
+
+               @Override
+               public DoubleMaximum get(double init) {
+                       return new DoubleMaximum(init);
+               }
+
+               public static DoubleMaximumFactory get() {
+                       return INSTANCE;
+               }
+       }
+
+       /**
+        * Factory for {@link DoubleMinimum}.
+        */
+       final class DoubleMinimumFactory implements 
DoubleAccumulatorFactory<DoubleMinimum> {
+               private static final DoubleMinimumFactory INSTANCE = new 
DoubleMinimumFactory();
+
+               private DoubleMinimumFactory(){
+               }
+
+               @Override
+               public DoubleMinimum get(double init) {
+                       return new DoubleMinimum(init);
+               }
+
+               public static DoubleMinimumFactory get() {
+                       return INSTANCE;
+               }
+       }
+
+       /**
+        * Factory for {@link DoubleSum}.
+        */
+       final class DoubleSumFactory implements 
DoubleAccumulatorFactory<DoubleSum> {
+               private static final DoubleSumFactory INSTANCE = new 
DoubleSumFactory();
+
+               private DoubleSumFactory(){
+               }
+
+               @Override
+               public DoubleSum get(double init) {
+                       return new DoubleSum(init);
+               }
+
+               public static DoubleSumFactory get() {
+                       return INSTANCE;
+               }
+       }
+
+       /**
+        * Factory for {@link DoubleAverage}.
+        */
+       final class DoubleAverageFactory implements 
DoubleAccumulatorFactory<DoubleAverage> {
+               private static final DoubleAverageFactory INSTANCE = new 
DoubleAverageFactory();
+
+               private DoubleAverageFactory(){
+               }
+
+               @Override
+               public DoubleAverage get(double init) {
+                       return new DoubleAverage(init);
+               }
+
+               public static DoubleAverageFactory get() {
+                       return INSTANCE;
+               }
+       }
+
+       /**
+        * {@link DoubleAccumulator} that returns the maximum value.
+        */
+       final class DoubleMaximum implements DoubleAccumulator {
+
+               public static final String NAME = "max";
+
+               private double value;
+
+               private DoubleMaximum(double init) {
+                       value = init;
+               }
+
+               @Override
+               public void add(double value) {
+                       this.value = Math.max(this.value, value);
+               }
+
+               @Override
+               public double getValue() {
+                       return value;
+               }
+
+               @Override
+               public String getName() {
+                       return NAME;
+               }
+       }
+
+       /**
+        * {@link DoubleAccumulator} that returns the minimum value.
+        */
+       final class DoubleMinimum implements DoubleAccumulator {
+
+               public static final String NAME = "min";
+
+               private double value;
+
+               private DoubleMinimum(double init) {
+                       value = init;
+               }
+
+               @Override
+               public void add(double value) {
+                       this.value = Math.min(this.value, value);
+               }
+
+               @Override
+               public double getValue() {
+                       return value;
+               }
+
+               @Override
+               public String getName() {
+                       return NAME;
+               }
+       }
+
+       /**
+        * {@link DoubleAccumulator} that returns the sum of all values.
+        */
+       final class DoubleSum implements DoubleAccumulator {
+
+               public static final String NAME = "sum";
+
+               private double value;
+
+               private DoubleSum(double init) {
+                       value = init;
+               }
+
+               @Override
+               public void add(double value) {
+                       this.value += value;
+               }
+
+               @Override
+               public double getValue() {
+                       return value;
+               }
+
+               @Override
+               public String getName() {
+                       return NAME;
+               }
+       }
+
+       /**
+        * {@link DoubleAccumulator} that returns the average over all values.
+        */
+       final class DoubleAverage implements DoubleAccumulator {
+
+               public static final String NAME = "avg";
+
+               private double sum;
+               private int count;
+
+               private DoubleAverage(double init) {
+                       sum = init;
+                       count = 1;
+               }
+
+               @Override
+               public void add(double value) {
+                       this.sum += value;
+                       this.count++;
+               }
+
+               @Override
+               public double getValue() {
+                       return sum / count;
+               }
+
+               @Override
+               public String getName() {
+                       return NAME;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
index b0e56de..a20368e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
@@ -27,7 +27,7 @@ import java.util.concurrent.Executor;
  * <p>If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
  * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
  *
- * <p>If the query parameters do contain a "get" parameter a comma-separate 
list of metric names is expected as a value.
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated 
list of metric names is expected as a value.
  * {@code /metrics?get=X,Y}
  * The handler will then return a list containing the values of the requested 
metrics.
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
index eaedea8..a5639d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
@@ -27,7 +27,7 @@ import java.util.concurrent.Executor;
  * <p>If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
  * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
  *
- * <p>If the query parameters do contain a "get" parameter a comma-separate 
list of metric names is expected as a value.
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated 
list of metric names is expected as a value.
  * {@code /metrics?get=X,Y}
  * The handler will then return a list containing the values of the requested 
metrics.
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
index 30fee25..2d67489 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
@@ -27,11 +27,14 @@ import java.util.concurrent.Executor;
  * <p>If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
  * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
  *
- * <p>If the query parameters do contain a "get" parameter a comma-separate 
list of metric names is expected as a value.
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated 
list of metric names is expected as a value.
  * {@code /metrics?get=X,Y}
  * The handler will then return a list containing the values of the requested 
metrics.
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * @deprecated This class is subsumed by {@link SubtaskMetricsHandler} and is 
only kept for backwards-compatibility.
  */
+@Deprecated
 public class JobVertexMetricsHandler extends AbstractMetricsHandler {
        public static final String PARAMETER_VERTEX_ID = "vertexid";
        private static final String JOB_VERTEX_METRICS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/metrics";

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index f9e79d3..473b9c2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.ThreadSafe;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -125,12 +126,12 @@ public class MetricStore {
         * @param taskID task ID
         * @return ComponentMetricStore for given IDs, or null if no store for 
the given arguments exists
         */
-       public synchronized ComponentMetricStore getTaskMetricStore(String 
jobID, String taskID) {
+       public synchronized TaskMetricStore getTaskMetricStore(String jobID, 
String taskID) {
                JobMetricStore job = jobID == null ? null : jobs.get(jobID);
                if (job == null || taskID == null) {
                        return null;
                }
-               return 
ComponentMetricStore.unmodifiable(job.getTaskMetricStore(taskID));
+               return 
TaskMetricStore.unmodifiable(job.getTaskMetricStore(taskID));
        }
 
        /**
@@ -218,11 +219,13 @@ public class MetricStore {
                                        QueryScopeInfo.OperatorQueryScopeInfo 
operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
                                        job = 
jobs.computeIfAbsent(operatorInfo.jobID, k -> new JobMetricStore());
                                        task = 
job.tasks.computeIfAbsent(operatorInfo.vertexID, k -> new TaskMetricStore());
+                                       subtask = 
task.subtasks.computeIfAbsent(operatorInfo.subtaskIndex, k -> new 
ComponentMetricStore());
                                        /**
                                         * As the WebInterface does not account 
for operators (because it can't) we don't
                                         * divide by operator and instead use 
the concatenation of subtask index, operator name and metric name
                                         * as the name.
                                         */
+                                       addMetric(subtask.metrics, 
operatorInfo.operatorName + "." + name, metric);
                                        addMetric(task.metrics, 
operatorInfo.subtaskIndex + "." + operatorInfo.operatorName + "." + name, 
metric);
                                        break;
                                default:
@@ -348,11 +351,33 @@ public class MetricStore {
         * Sub-structure containing metrics of a single Task.
         */
        @ThreadSafe
-       private static class TaskMetricStore extends ComponentMetricStore {
-               private final Map<Integer, ComponentMetricStore> subtasks = new 
ConcurrentHashMap<>();
+       public static class TaskMetricStore extends ComponentMetricStore {
+               private final Map<Integer, ComponentMetricStore> subtasks;
+
+               private TaskMetricStore() {
+                       this(new ConcurrentHashMap<>(), new 
ConcurrentHashMap<>());
+               }
+
+               private TaskMetricStore(Map<String, String> metrics, 
Map<Integer, ComponentMetricStore> subtasks) {
+                       super(metrics);
+                       this.subtasks = checkNotNull(subtasks);
+               }
 
                public ComponentMetricStore getSubtaskMetricStore(int 
subtaskIndex) {
                        return subtasks.get(subtaskIndex);
                }
+
+               public Collection<ComponentMetricStore> 
getAllSubtaskMetricStores() {
+                       return subtasks.values();
+               }
+
+               private static TaskMetricStore unmodifiable(TaskMetricStore 
source) {
+                       if (source == null) {
+                               return null;
+                       }
+                       return new TaskMetricStore(
+                               unmodifiableMap(source.metrics),
+                               unmodifiableMap(source.subtasks));
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandler.java
new file mode 100644
index 0000000..bc238e9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandler.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import 
org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across all subtasks of a single 
tasks, a list of all available metrics or the
+ * values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated 
list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested 
metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * <p>The "agg" query parameter is used to define which aggregates should be 
calculated. Available aggregations are
+ * "sum", "max", "min" and "avg".
+ */
+public class SubtaskMetricsHandler extends AbstractMetricsHandler {
+       private static final String SUBTASK_METRICS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/metrics";
+
+       public SubtaskMetricsHandler(Executor executor, MetricFetcher fetcher) {
+               super(executor, fetcher);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{SUBTASK_METRICS_REST_PATH};
+       }
+
+       @Override
+       protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
+               String subtaskNumString = 
pathParams.get(SubtaskExecutionAttemptDetailsHandler.PARAMETER_SUBTASK_INDEX);
+               int subtaskNum;
+               try {
+                       subtaskNum = Integer.valueOf(subtaskNumString);
+               } catch (NumberFormatException nfe) {
+                       return null;
+               }
+               MetricStore.ComponentMetricStore subtask = 
metrics.getSubtaskMetricStore(
+                       pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID),
+                       
pathParams.get(JobVertexMetricsHandler.PARAMETER_VERTEX_ID),
+                       subtaskNum);
+               return subtask != null
+                       ? subtask.metrics
+                       : null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
index 66702c9..1b7e98c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
@@ -29,7 +29,7 @@ import java.util.concurrent.Executor;
  * <p>If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
  * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
  *
- * <p>If the query parameters do contain a "get" parameter a comma-separate 
list of metric names is expected as a value.
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated 
list of metric names is expected as a value.
  * {@code /metrics?get=X,Y}
  * The handler will then return a list containing the values of the requested 
metrics.
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandlerTest.java
new file mode 100644
index 0000000..a7f526f
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandlerTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+/**
+ * Tests for the {@link AbstractAggregatingMetricsHandler}.
+ */
+public class AbstractAggregatingMetricsHandlerTest {
+       private static final ObjectMapper mapper = new ObjectMapper();
+
+       private static AggregatingTaskManagersMetricsHandler handler;
+
+       @BeforeClass
+       public static void setupHandler() {
+               MetricFetcher fetcher = new MetricFetcher(
+                       mock(GatewayRetriever.class),
+                       mock(MetricQueryServiceRetriever.class),
+                       Executors.directExecutor(),
+                       TestingUtils.TIMEOUT());
+               MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+               handler = new 
AggregatingTaskManagersMetricsHandler(Executors.directExecutor(), fetcher);
+       }
+
+       @Test
+       public void testListMetrics() throws Exception {
+               String response = 
handler.handleJsonRequest(Collections.emptyMap(), Collections.emptyMap(), null)
+                       .get();
+
+               
Assert.assertEquals("[{\"id\":\"abc.metric22\"},{\"id\":\"abc.metric2b\"},{\"id\":\"abc.metric2\"}]",
 response);
+       }
+
+       @Test
+       public void testMinAggregation() throws Exception {
+               Map<String, String> queryParams = new HashMap<>();
+               queryParams.put("get", "abc.metric2");
+               queryParams.put("agg", "min");
+               String response = 
handler.handleJsonRequest(Collections.emptyMap(), queryParams, null)
+                       .get();
+
+               JsonNode responseObject = mapper.readTree(response);
+               Assert.assertEquals(1, responseObject.size());
+               JsonNode metricInfo = responseObject.get(0);
+               Assert.assertEquals("abc.metric2", 
metricInfo.get("id").asText());
+               Assert.assertEquals(1, metricInfo.get("min").asDouble(), 0.01);
+       }
+
+       @Test
+       public void testMaxAggregation() throws Exception {
+               Map<String, String> queryParams = new HashMap<>();
+               queryParams.put("get", "abc.metric2");
+               queryParams.put("agg", "max");
+               String response = 
handler.handleJsonRequest(Collections.emptyMap(), queryParams, null)
+                       .get();
+
+               JsonNode responseObject = mapper.readTree(response);
+               Assert.assertEquals(1, responseObject.size());
+               JsonNode metricInfo = responseObject.get(0);
+               Assert.assertEquals("abc.metric2", 
metricInfo.get("id").asText());
+               Assert.assertEquals(10, metricInfo.get("max").asDouble(), 0.01);
+       }
+
+       @Test
+       public void testAvgAggregation() throws Exception {
+               Map<String, String> queryParams = new HashMap<>();
+               queryParams.put("get", "abc.metric2");
+               queryParams.put("agg", "avg");
+               String response = 
handler.handleJsonRequest(Collections.emptyMap(), queryParams, null)
+                       .get();
+
+               JsonNode responseObject = mapper.readTree(response);
+               Assert.assertEquals(1, responseObject.size());
+               JsonNode metricInfo = responseObject.get(0);
+               Assert.assertEquals("abc.metric2", 
metricInfo.get("id").asText());
+               Assert.assertEquals(5.5, metricInfo.get("avg").asDouble(), 
0.01);
+       }
+
+       @Test
+       public void testSumAggregation() throws Exception {
+               Map<String, String> queryParams = new HashMap<>();
+               queryParams.put("get", "abc.metric2");
+               queryParams.put("agg", "sum");
+               String response = 
handler.handleJsonRequest(Collections.emptyMap(), queryParams, null)
+                       .get();
+
+               JsonNode responseObject = mapper.readTree(response);
+               Assert.assertEquals(1, responseObject.size());
+               JsonNode metricInfo = responseObject.get(0);
+               Assert.assertEquals("abc.metric2", 
metricInfo.get("id").asText());
+               Assert.assertEquals(11, metricInfo.get("sum").asDouble(), 0.01);
+       }
+
+       @Test
+       public void testMultipleAggregations() throws Exception {
+               Map<String, String> queryParams = new HashMap<>();
+               queryParams.put("get", "abc.metric2");
+               queryParams.put("agg", "sum,max,min,avg");
+               String response = 
handler.handleJsonRequest(Collections.emptyMap(), queryParams, null)
+                       .get();
+
+               JsonNode array = mapper.readTree(response);
+               Assert.assertEquals(1, array.size());
+
+               FullMetricInfo mappedResponse = 
mapper.treeToValue(array.get(0), FullMetricInfo.class);
+               Assert.assertEquals("abc.metric2", mappedResponse.id);
+               Assert.assertEquals(11, mappedResponse.sum, 0.01);
+               Assert.assertEquals(1, mappedResponse.min, 0.01);
+               Assert.assertEquals(10, mappedResponse.max, 0.01);
+               Assert.assertEquals(5.5, mappedResponse.avg, 0.01);
+       }
+
+       @Test
+       public void testDefaultAggregations() throws Exception {
+               Map<String, String> queryParams = new HashMap<>();
+               queryParams.put("get", "abc.metric2");
+               String response = 
handler.handleJsonRequest(Collections.emptyMap(), queryParams, null)
+                       .get();
+
+               JsonNode array = mapper.readTree(response);
+               Assert.assertEquals(1, array.size());
+
+               FullMetricInfo mappedResponse = 
mapper.treeToValue(array.get(0), FullMetricInfo.class);
+               Assert.assertEquals("abc.metric2", mappedResponse.id);
+               Assert.assertEquals(11, mappedResponse.sum, 0.01);
+               Assert.assertEquals(1, mappedResponse.min, 0.01);
+               Assert.assertEquals(10, mappedResponse.max, 0.01);
+               Assert.assertEquals(5.5, mappedResponse.avg, 0.01);
+       }
+
+       private static class FullMetricInfo {
+               private final String id;
+               private final double min;
+               private final double max;
+               private final double avg;
+               private final double sum;
+
+               @JsonCreator
+               private FullMetricInfo(
+                       @JsonProperty("id") String id,
+                       @JsonProperty("min") double min,
+                       @JsonProperty("max") double max,
+                       @JsonProperty("avg") double avg,
+                       @JsonProperty("sum") double sum) {
+                       this.id = id;
+                       this.min = min;
+                       this.max = max;
+                       this.avg = avg;
+                       this.sum = sum;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandlerTest.java
index 8c38e79..19dd3e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandlerTest.java
@@ -63,6 +63,8 @@ public class AbstractMetricsHandlerTest extends TestLogger {
                assertEquals("[" +
                                "{\"id\":\"8.opname.abc.metric6\"}," +
                                "{\"id\":\"8.opname.abc.metric7\"}," +
+                               "{\"id\":\"1.opname.abc.metric6\"}," +
+                               "{\"id\":\"1.opname.abc.metric7\"}," +
                                "{\"id\":\"8.abc.metric5\"}" +
                                "]",
                        availableList);

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingJobsMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingJobsMetricsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingJobsMetricsHandlerTest.java
new file mode 100644
index 0000000..5493750
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingJobsMetricsHandlerTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+/**
+ * Tests for the {@link AggregatingJobsMetricsHandler}.
+ */
+public class AggregatingJobsMetricsHandlerTest extends TestLogger {
+       @Test
+       public void testGetPaths() {
+               AggregatingJobsMetricsHandler handler = new 
AggregatingJobsMetricsHandler(Executors.directExecutor(), 
mock(MetricFetcher.class));
+               String[] paths = handler.getPaths();
+               assertEquals(1, paths.length);
+               assertEquals("/jobs/metrics", paths[0]);
+       }
+
+       @Test
+       public void getStores() throws Exception {
+               MetricFetcher fetcher = new MetricFetcher(
+                       mock(GatewayRetriever.class),
+                       mock(MetricQueryServiceRetriever.class),
+                       Executors.directExecutor(),
+                       TestingUtils.TIMEOUT());
+               MetricStore store = 
MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+               AggregatingJobsMetricsHandler handler = new 
AggregatingJobsMetricsHandler(Executors.directExecutor(), fetcher);
+
+               Map<String, String> pathParams = new HashMap<>();
+               Map<String, String> queryParams = new HashMap<>();
+
+               Collection<? extends MetricStore.ComponentMetricStore> stores = 
handler.getStores(store, pathParams, queryParams);
+
+               assertEquals(2, stores.size());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandlerTest.java
new file mode 100644
index 0000000..fc04b63
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandlerTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
+import static 
org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID;
+import static org.junit.Assert.assertEquals;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+/**
+ * Tests for the {@link AggregatingSubtasksMetricsHandler}.
+ */
+public class AggregatingSubtasksMetricsHandlerTest extends TestLogger {
+       @Test
+       public void testGetPaths() {
+               AggregatingSubtasksMetricsHandler handler = new 
AggregatingSubtasksMetricsHandler(Executors.directExecutor(), 
mock(MetricFetcher.class));
+               String[] paths = handler.getPaths();
+               assertEquals(1, paths.length);
+               
assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/metrics", paths[0]);
+       }
+
+       @Test
+       public void getStores() throws Exception {
+               MetricFetcher fetcher = new MetricFetcher(
+                       mock(GatewayRetriever.class),
+                       mock(MetricQueryServiceRetriever.class),
+                       Executors.directExecutor(),
+                       TestingUtils.TIMEOUT());
+               MetricStore store = 
MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+               AggregatingSubtasksMetricsHandler handler = new 
AggregatingSubtasksMetricsHandler(Executors.directExecutor(), fetcher);
+
+               Map<String, String> pathParams = new HashMap<>();
+               pathParams.put(PARAMETER_JOB_ID, "jobid");
+               pathParams.put(PARAMETER_VERTEX_ID, "taskid");
+               Map<String, String> queryParams = new HashMap<>();
+
+               Collection<? extends MetricStore.ComponentMetricStore> stores = 
handler.getStores(store, pathParams, queryParams);
+
+               assertEquals(2, stores.size());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingTaskManagersMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingTaskManagersMetricsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingTaskManagersMetricsHandlerTest.java
new file mode 100644
index 0000000..bb514af
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingTaskManagersMetricsHandlerTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+/**
+ * Tests for the {@link AggregatingTaskManagersMetricsHandler}.
+ */
+public class AggregatingTaskManagersMetricsHandlerTest extends TestLogger {
+       @Test
+       public void testGetPaths() {
+               AggregatingTaskManagersMetricsHandler handler = new 
AggregatingTaskManagersMetricsHandler(Executors.directExecutor(), 
mock(MetricFetcher.class));
+               String[] paths = handler.getPaths();
+               assertEquals(1, paths.length);
+               assertEquals("/taskmanagers/metrics", paths[0]);
+       }
+
+       @Test
+       public void getStores() throws Exception {
+               MetricFetcher fetcher = new MetricFetcher(
+                       mock(GatewayRetriever.class),
+                       mock(MetricQueryServiceRetriever.class),
+                       Executors.directExecutor(),
+                       TestingUtils.TIMEOUT());
+               MetricStore store = 
MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+               AggregatingTaskManagersMetricsHandler handler = new 
AggregatingTaskManagersMetricsHandler(Executors.directExecutor(), fetcher);
+
+               Map<String, String> pathParams = new HashMap<>();
+               Map<String, String> queryParams = new HashMap<>();
+
+               Collection<? extends MetricStore.ComponentMetricStore> stores = 
handler.getStores(store, pathParams, queryParams);
+
+               assertEquals(2, stores.size());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
index 82c6894..1d038ab 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
@@ -69,11 +69,20 @@ public class MetricStoreTest extends TestLogger {
 
                QueryScopeInfo.TaskManagerQueryScopeInfo tm = new 
QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "abc");
                MetricDump.CounterDump cd2 = new MetricDump.CounterDump(tm, 
"metric2", 1);
+               MetricDump.CounterDump cd2a = new MetricDump.CounterDump(tm, 
"metric22", 1);
+
+               QueryScopeInfo.TaskManagerQueryScopeInfo tm2 = new 
QueryScopeInfo.TaskManagerQueryScopeInfo("tmid2", "abc");
+               MetricDump.CounterDump cd22 = new MetricDump.CounterDump(tm2, 
"metric2", 10);
+               MetricDump.CounterDump cd22a = new MetricDump.CounterDump(tm2, 
"metric2b", 10);
 
                QueryScopeInfo.JobQueryScopeInfo job = new 
QueryScopeInfo.JobQueryScopeInfo("jobid", "abc");
                MetricDump.CounterDump cd3 = new MetricDump.CounterDump(job, 
"metric3", 2);
                MetricDump.CounterDump cd4 = new MetricDump.CounterDump(job, 
"metric4", 3);
 
+               QueryScopeInfo.JobQueryScopeInfo job2 = new 
QueryScopeInfo.JobQueryScopeInfo("jobid2", "abc");
+               MetricDump.CounterDump cd32 = new MetricDump.CounterDump(job2, 
"metric3", 2);
+               MetricDump.CounterDump cd42 = new MetricDump.CounterDump(job2, 
"metric4", 3);
+
                QueryScopeInfo.TaskQueryScopeInfo task = new 
QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, "abc");
                MetricDump.CounterDump cd5 = new MetricDump.CounterDump(task, 
"metric5", 4);
 
@@ -81,14 +90,26 @@ public class MetricStoreTest extends TestLogger {
                MetricDump.CounterDump cd6 = new 
MetricDump.CounterDump(operator, "metric6", 5);
                MetricDump.CounterDump cd7 = new 
MetricDump.CounterDump(operator, "metric7", 6);
 
+               QueryScopeInfo.OperatorQueryScopeInfo operator2 = new 
QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 1, "opname", "abc");
+               MetricDump.CounterDump cd62 = new 
MetricDump.CounterDump(operator2, "metric6", 5);
+               MetricDump.CounterDump cd72 = new 
MetricDump.CounterDump(operator2, "metric7", 6);
+
                store.add(cd1);
                store.add(cd2);
+               store.add(cd2a);
                store.add(cd3);
                store.add(cd4);
                store.add(cd5);
                store.add(cd6);
                store.add(cd7);
 
+               store.add(cd62);
+               store.add(cd72);
+               store.add(cd22);
+               store.add(cd22a);
+               store.add(cd32);
+               store.add(cd42);
+
                return store;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d48b208a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandlerTest.java
new file mode 100644
index 0000000..152060d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandlerTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler.PARAMETER_SUBTASK_INDEX;
+import static 
org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
+import static 
org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+/**
+ * Tests for the {@link SubtaskMetricsHandler}.
+ */
+public class SubtaskMetricsHandlerTest extends TestLogger {
+       @Test
+       public void testGetPaths() {
+               SubtaskMetricsHandler handler = new 
SubtaskMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/metrics",
 paths[0]);
+       }
+
+       @Test
+       public void getMapFor() throws Exception {
+               MetricFetcher fetcher = new MetricFetcher(
+                       mock(GatewayRetriever.class),
+                       mock(MetricQueryServiceRetriever.class),
+                       Executors.directExecutor(),
+                       TestingUtils.TIMEOUT());
+               MetricStore store = 
MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+               SubtaskMetricsHandler handler = new 
SubtaskMetricsHandler(Executors.directExecutor(), fetcher);
+
+               Map<String, String> pathParams = new HashMap<>();
+               pathParams.put(PARAMETER_JOB_ID, "jobid");
+               pathParams.put(PARAMETER_VERTEX_ID, "taskid");
+               pathParams.put(PARAMETER_SUBTASK_INDEX, "8");
+
+               Map<String, String> metrics = handler.getMapFor(pathParams, 
store);
+
+               assertEquals("4", metrics.get("abc.metric5"));
+               assertEquals("5", metrics.get("opname.abc.metric6"));
+               assertEquals("6", metrics.get("opname.abc.metric7"));
+       }
+
+       @Test
+       public void getMapForNull() {
+               MetricFetcher fetcher = new MetricFetcher(
+                       mock(GatewayRetriever.class),
+                       mock(MetricQueryServiceRetriever.class),
+                       Executors.directExecutor(),
+                       TestingUtils.TIMEOUT());
+               MetricStore store = fetcher.getMetricStore();
+
+               SubtaskMetricsHandler handler = new 
SubtaskMetricsHandler(Executors.directExecutor(), fetcher);
+
+               Map<String, String> pathParams = new HashMap<>();
+
+               Map<String, String> metrics = handler.getMapFor(pathParams, 
store);
+
+               assertNull(metrics);
+       }
+}

Reply via email to