[FLINK-8370][REST] Port AggregatingMetricsHandler to flip6

This closes #5805.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0410d80
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0410d80
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0410d80

Branch: refs/heads/master
Commit: c0410d801e406e77b1e6e7134224f7946906a49f
Parents: 4645d3c
Author: zentol <ches...@apache.org>
Authored: Wed Mar 28 12:52:07 2018 +0200
Committer: zentol <ches...@apache.org>
Committed: Mon Apr 16 21:17:54 2018 +0200

----------------------------------------------------------------------
 .../AbstractAggregatingMetricsHandler.java      | 300 ++++++++++++++
 .../metrics/AggregatingJobsMetricsHandler.java  |  77 ++++
 .../AggregatingSubtasksMetricsHandler.java      | 119 ++++++
 .../AggregatingTaskManagersMetricsHandler.java  |  77 ++++
 .../handler/job/metrics/DoubleAccumulator.java  | 257 ++++++++++++
 .../AbstractAggregatedMetricsHeaders.java       |  50 +++
 .../AbstractAggregatedMetricsParameters.java    |  48 +++
 .../AggregateTaskManagerMetricsParameters.java  |  38 ++
 .../metrics/AggregatedJobMetricsHeaders.java    |  44 +++
 .../metrics/AggregatedJobMetricsParameters.java |  39 ++
 .../messages/job/metrics/AggregatedMetric.java  | 118 ++++++
 .../metrics/AggregatedMetricsResponseBody.java  | 112 ++++++
 .../AggregatedSubtaskMetricsHeaders.java        |  47 +++
 .../AggregatedSubtaskMetricsParameters.java     |  51 +++
 .../AggregatedTaskManagerMetricsHeaders.java    |  44 +++
 .../job/metrics/JobsFilterQueryParameter.java   |  48 +++
 .../metrics/MetricsAggregationParameter.java    |  58 +++
 .../metrics/SubtasksFilterQueryParameter.java   |  41 ++
 .../TaskManagersFilterQueryParameter.java       |  42 ++
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  33 ++
 .../AggregatingJobsMetricsHandlerTest.java      |  81 ++++
 .../AggregatingMetricsHandlerTestBase.java      | 389 +++++++++++++++++++
 .../AggregatingSubtasksMetricsHandlerTest.java  |  93 +++++
 ...gregatingTaskManagersMetricsHandlerTest.java |  82 ++++
 24 files changed, 2288 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
new file mode 100644
index 0000000..338bb46
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract request handler for querying aggregated metrics. Subclasses return 
either a list of all available metrics
+ * or the aggregated values of them across all/selected entities.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated 
list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested 
metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * <p>The "agg" query parameter is used to define which aggregates should be 
calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified, all 
aggregations will be returned.
+ * {@code /metrics?get=X,Y&agg=min,max}
+ * The handler will then return a list of objects containing the aggregations 
for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", 
"max", "10"}]}
+ */
+public abstract class AbstractAggregatingMetricsHandler<P extends 
AbstractAggregatedMetricsParameters<?>> extends 
AbstractRestHandler<RestfulGateway, EmptyRequestBody, 
AggregatedMetricsResponseBody, P> {
+
+       private final Executor executor;
+       private final MetricFetcher<?> fetcher;
+
+       protected AbstractAggregatingMetricsHandler(
+                       CompletableFuture<String> localRestAddress,
+                       GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       Time timeout,
+                       Map<String, String> responseHeaders,
+                       AbstractAggregatedMetricsHeaders<P> messageHeaders,
+                       Executor executor,
+                       MetricFetcher<?> fetcher) {
+               super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+               this.executor = Preconditions.checkNotNull(executor);
+               this.fetcher = Preconditions.checkNotNull(fetcher);
+       }
+
+       @Nonnull
+       abstract Collection<? extends MetricStore.ComponentMetricStore> 
getStores(MetricStore store, HandlerRequest<EmptyRequestBody, P> request);
+
+       @Override
+       protected CompletableFuture<AggregatedMetricsResponseBody> 
handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, P> request, @Nonnull 
RestfulGateway gateway) throws RestHandlerException {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       fetcher.update();
+                                       List<String> requestedMetrics = 
request.getQueryParameter(MetricsFilterParameter.class);
+                                       
List<MetricsAggregationParameter.AggregationMode> requestedAggregations = 
request.getQueryParameter(MetricsAggregationParameter.class);
+                                       MetricStore store = 
fetcher.getMetricStore();
+
+                                       Collection<? extends 
MetricStore.ComponentMetricStore> stores = getStores(store, request);
+
+                                       if (requestedMetrics.isEmpty()) {
+                                               Collection<String> list = 
getAvailableMetrics(stores);
+                                               return new 
AggregatedMetricsResponseBody(
+                                                       list.stream()
+                                                               
.map(AggregatedMetric::new)
+                                                               
.collect(Collectors.toList())
+                                               );
+                                       }
+
+                                       DoubleAccumulator.DoubleMinimumFactory 
minimumFactory = null;
+                                       DoubleAccumulator.DoubleMaximumFactory 
maximumFactory = null;
+                                       DoubleAccumulator.DoubleAverageFactory 
averageFactory = null;
+                                       DoubleAccumulator.DoubleSumFactory 
sumFactory = null;
+                                       // by default we return all aggregations
+                                       if (requestedAggregations.isEmpty()) {
+                                               minimumFactory = 
DoubleAccumulator.DoubleMinimumFactory.get();
+                                               maximumFactory = 
DoubleAccumulator.DoubleMaximumFactory.get();
+                                               averageFactory = 
DoubleAccumulator.DoubleAverageFactory.get();
+                                               sumFactory = 
DoubleAccumulator.DoubleSumFactory.get();
+                                       } else {
+                                               for 
(MetricsAggregationParameter.AggregationMode aggregation : 
requestedAggregations) {
+                                                       switch (aggregation) {
+                                                               case MIN:
+                                                                       
minimumFactory = DoubleAccumulator.DoubleMinimumFactory.get();
+                                                                       break;
+                                                               case MAX:
+                                                                       
maximumFactory = DoubleAccumulator.DoubleMaximumFactory.get();
+                                                                       break;
+                                                               case AVG:
+                                                                       
averageFactory = DoubleAccumulator.DoubleAverageFactory.get();
+                                                                       break;
+                                                               case SUM:
+                                                                       
sumFactory = DoubleAccumulator.DoubleSumFactory.get();
+                                                                       break;
+                                                               default:
+                                                                       
log.warn("Unsupported aggregation specified: {}", aggregation);
+                                                       }
+                                               }
+                                       }
+                                       MetricAccumulatorFactory 
metricAccumulatorFactory = new MetricAccumulatorFactory(minimumFactory, 
maximumFactory, averageFactory, sumFactory);
+
+                                       return 
getAggregatedMetricValues(stores, requestedMetrics, metricAccumulatorFactory);
+                               } catch (Exception e) {
+                                       log.warn("Could not retrieve metrics.", 
e);
+                                       throw new CompletionException(new 
RestHandlerException("Could not retrieve metrics.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR));
+                               }
+                       },
+                       executor);
+       }
+
+       /**
+        * Returns a JSON string containing a list of all available metrics in 
the given stores. Effectively this method maps
+        * the union of all key-sets to JSON.
+        *
+        * @param stores metrics
+        * @return JSON string containing a list of all available metrics
+        */
+       private static Collection<String> getAvailableMetrics(Collection<? 
extends MetricStore.ComponentMetricStore> stores) {
+               Set<String> uniqueMetrics = new HashSet<>(32);
+               for (MetricStore.ComponentMetricStore store : stores) {
+                       uniqueMetrics.addAll(store.metrics.keySet());
+               }
+               return uniqueMetrics;
+       }
+
+       /**
+        * Extracts and aggregates all requested metrics from the given metric 
stores, and maps the result to a JSON string.
+        *
+        * @param stores available metrics
+        * @param requestedMetrics ids of requested metrics
+        * @param requestedAggregationsFactories requested aggregations
+        * @return JSON string containing the requested metrics
+        */
+       private AggregatedMetricsResponseBody getAggregatedMetricValues(
+                       Collection<? extends MetricStore.ComponentMetricStore> 
stores,
+                       List<String> requestedMetrics,
+                       MetricAccumulatorFactory 
requestedAggregationsFactories) {
+
+               Collection<AggregatedMetric> aggregatedMetrics = new 
ArrayList<>(requestedMetrics.size());
+               for (String requestedMetric : requestedMetrics) {
+                       final Collection<Double> values = new 
ArrayList<>(stores.size());
+                       try {
+                               for (MetricStore.ComponentMetricStore store : 
stores) {
+                                       String stringValue = 
store.metrics.get(requestedMetric);
+                                       if (stringValue != null) {
+                                               
values.add(Double.valueOf(stringValue));
+                                       }
+                               }
+                       } catch (NumberFormatException nfe) {
+                               log.warn("The metric {} is not numeric and 
can't be aggregated.", requestedMetric, nfe);
+                               // metric is not numeric so we can't perform 
aggregations => ignore it
+                               continue;
+                       }
+                       if (!values.isEmpty()) {
+
+                               Iterator<Double> valuesIterator = 
values.iterator();
+                               MetricAccumulator acc = 
requestedAggregationsFactories.get(requestedMetric, valuesIterator.next());
+                               valuesIterator.forEachRemaining(acc::add);
+
+                               aggregatedMetrics.add(acc.get());
+                       } else {
+                               return new 
AggregatedMetricsResponseBody(Collections.emptyList());
+                       }
+               }
+               return new AggregatedMetricsResponseBody(aggregatedMetrics);
+       }
+
+       private static class MetricAccumulatorFactory {
+
+               @Nullable
+               private final DoubleAccumulator.DoubleMinimumFactory 
minimumFactory;
+
+               @Nullable
+               private final DoubleAccumulator.DoubleMaximumFactory 
maximumFactory;
+
+               @Nullable
+               private final DoubleAccumulator.DoubleAverageFactory 
averageFactory;
+
+               @Nullable
+               private final DoubleAccumulator.DoubleSumFactory sumFactory;
+
+               private MetricAccumulatorFactory(
+                               @Nullable 
DoubleAccumulator.DoubleMinimumFactory minimumFactory,
+                               @Nullable 
DoubleAccumulator.DoubleMaximumFactory maximumFactory,
+                               @Nullable 
DoubleAccumulator.DoubleAverageFactory averageFactory,
+                               @Nullable DoubleAccumulator.DoubleSumFactory 
sumFactory) {
+                       this.minimumFactory = minimumFactory;
+                       this.maximumFactory = maximumFactory;
+                       this.averageFactory = averageFactory;
+                       this.sumFactory = sumFactory;
+               }
+
+               MetricAccumulator get(String metricName, double init) {
+                       return new MetricAccumulator(
+                               metricName,
+                               minimumFactory == null ? null : 
minimumFactory.get(init),
+                               maximumFactory == null ? null : 
maximumFactory.get(init),
+                               averageFactory == null ? null : 
averageFactory.get(init),
+                               sumFactory == null ? null : sumFactory.get(init)
+                       );
+               }
+       }
+
+       private static class MetricAccumulator {
+               private final String metricName;
+
+               @Nullable
+               private final DoubleAccumulator min;
+               @Nullable
+               private final DoubleAccumulator max;
+               @Nullable
+               private final DoubleAccumulator avg;
+               @Nullable
+               private final DoubleAccumulator sum;
+
+               private MetricAccumulator(
+                               String metricName,
+                               @Nullable DoubleAccumulator min,
+                               @Nullable DoubleAccumulator max,
+                               @Nullable DoubleAccumulator avg,
+                               @Nullable DoubleAccumulator sum) {
+                       this.metricName = 
Preconditions.checkNotNull(metricName);
+                       this.min = min;
+                       this.max = max;
+                       this.avg = avg;
+                       this.sum = sum;
+               }
+
+               void add(double value) {
+                       if (min != null) {
+                               min.add(value);
+                       }
+                       if (max != null) {
+                               max.add(value);
+                       }
+                       if (avg != null) {
+                               avg.add(value);
+                       }
+                       if (sum != null) {
+                               sum.add(value);
+                       }
+               }
+
+               AggregatedMetric get() {
+                       return new AggregatedMetric(
+                               metricName,
+                               min == null ? null : min.getValue(),
+                               max == null ? null : max.getValue(),
+                               avg == null ? null : avg.getValue(),
+                               sum == null ? null : sum.getValue()
+                       );
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
new file mode 100644
index 0000000..42928a4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedJobMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedJobMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.JobsFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across jobs, a list of all 
available metrics or the values
+ * for a set of metrics.
+ *
+ * <p>Specific jobs can be selected for aggregation by specifying a 
comma-separated list of job IDs.
+ * {@code /metrics?get=X,Y&jobs=A,B}
+ */
+public class AggregatingJobsMetricsHandler extends 
AbstractAggregatingMetricsHandler<AggregatedJobMetricsParameters> {
+
+       public AggregatingJobsMetricsHandler(
+                       CompletableFuture<String> localRestAddress,
+                       GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       Time timeout, Map<String, String> responseHeaders,
+                       Executor executor,
+                       MetricFetcher<?> fetcher) {
+               super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, AggregatedJobMetricsHeaders.getInstance(), executor, fetcher);
+       }
+
+       @Nonnull
+       @Override
+       Collection<? extends MetricStore.ComponentMetricStore> 
getStores(MetricStore store, HandlerRequest<EmptyRequestBody, 
AggregatedJobMetricsParameters> request) {
+               List<JobID> jobs = 
request.getQueryParameter(JobsFilterQueryParameter.class);
+               if (jobs.isEmpty()) {
+                       return store.getJobs().values();
+               } else {
+                       Collection<MetricStore.ComponentMetricStore> jobStores 
= new ArrayList<>(jobs.size());
+                       for (JobID job : jobs) {
+                               MetricStore.ComponentMetricStore jobMetricStore 
= store.getJobMetricStore(job.toString());
+                               if (jobMetricStore != null) {
+                                       jobStores.add(jobMetricStore);
+                               }
+                       }
+                       return jobStores;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
new file mode 100644
index 0000000..f95deaa
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.IntStream;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all 
available metrics or the values
+ * for a set of metrics.
+ *
+ * <p>Specific subtasks can be selected for aggregation by specifying a 
comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y&subtasks=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends 
AbstractAggregatingMetricsHandler<AggregatedSubtaskMetricsParameters> {
+
+       public AggregatingSubtasksMetricsHandler(
+                       CompletableFuture<String> localRestAddress,
+                       GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       Time timeout,
+                       Map<String, String> responseHeaders,
+                       Executor executor,
+                       MetricFetcher<?> fetcher) {
+               super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, 
fetcher);
+       }
+
+       @Nonnull
+       @Override
+       Collection<? extends MetricStore.ComponentMetricStore> 
getStores(MetricStore store, HandlerRequest<EmptyRequestBody, 
AggregatedSubtaskMetricsParameters> request) {
+               JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+               JobVertexID taskID = 
request.getPathParameter(JobVertexIdPathParameter.class);
+
+               Collection<String> subtaskRanges = 
request.getQueryParameter(SubtasksFilterQueryParameter.class);
+               if (subtaskRanges.isEmpty()) {
+                       MetricStore.TaskMetricStore taskMetricStore = 
store.getTaskMetricStore(jobID.toString(), taskID.toString());
+                       if (taskMetricStore != null) {
+                               return 
taskMetricStore.getAllSubtaskMetricStores();
+                       } else {
+                               return Collections.emptyList();
+                       }
+               } else {
+                       Iterable<Integer> subtasks = 
getIntegerRangeFromString(subtaskRanges);
+                       Collection<MetricStore.ComponentMetricStore> 
subtaskStores = new ArrayList<>(8);
+                       for (int subtask : subtasks) {
+                               MetricStore.ComponentMetricStore 
subtaskMetricStore = store.getSubtaskMetricStore(jobID.toString(), 
taskID.toString(), subtask);
+                               if (subtaskMetricStore != null) {
+                                       subtaskStores.add(subtaskMetricStore);
+                               }
+                       }
+                       return subtaskStores;
+               }
+       }
+
+       private Iterable<Integer> getIntegerRangeFromString(Collection<String> 
ranges) {
+               UnionIterator<Integer> iterators = new UnionIterator<>();
+
+               for (String rawRange : ranges) {
+                       try {
+                               Iterator<Integer> rangeIterator;
+                               String range = rawRange.trim();
+                               int dashIdx = range.indexOf('-');
+                               if (dashIdx == -1) {
+                                       // only one value in range:
+                                       rangeIterator = 
Collections.singleton(Integer.valueOf(range)).iterator();
+                               } else {
+                                       // evaluate range
+                                       final int start = 
Integer.valueOf(range.substring(0, dashIdx));
+                                       final int end = 
Integer.valueOf(range.substring(dashIdx + 1, range.length()));
+                                       rangeIterator = 
IntStream.rangeClosed(start, end).iterator();
+                               }
+                               iterators.add(rangeIterator);
+                       } catch (NumberFormatException nfe) {
+                               log.warn("Invalid value {} specified for 
integer range. Not a number.", rawRange, nfe);
+                       }
+               }
+
+               return iterators;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
new file mode 100644
index 0000000..2e15cac
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregateTaskManagerMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedTaskManagerMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.TaskManagersFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across task managers, a list of 
all available metrics or the values for
+ * a set of metrics.
+ *
+ * <p>Specific taskmanagers can be selected for aggregation by specifying a 
comma-separated list of taskmanager IDs.
+ * {@code /metrics?get=X,Y&taskmanagers=A,B}
+ */
+public class AggregatingTaskManagersMetricsHandler extends 
AbstractAggregatingMetricsHandler<AggregateTaskManagerMetricsParameters> {
+
+       public AggregatingTaskManagersMetricsHandler(
+                       CompletableFuture<String> localRestAddress,
+                       GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       Time timeout, Map<String, String> responseHeaders,
+                       Executor executor,
+                       MetricFetcher<?> fetcher) {
+               super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, AggregatedTaskManagerMetricsHeaders.getInstance(), executor, 
fetcher);
+       }
+
+       @Nonnull
+       @Override
+       Collection<? extends MetricStore.ComponentMetricStore> 
getStores(MetricStore store, HandlerRequest<EmptyRequestBody, 
AggregateTaskManagerMetricsParameters> request) {
+               List<ResourceID> taskmanagers = 
request.getQueryParameter(TaskManagersFilterQueryParameter.class);
+               if (taskmanagers.isEmpty()) {
+                       return store.getTaskManagers().values();
+               } else {
+                       Collection<MetricStore.TaskManagerMetricStore> 
taskmanagerStores = new ArrayList<>(taskmanagers.size());
+                       for (ResourceID taskmanager : taskmanagers) {
+                               MetricStore.TaskManagerMetricStore 
taskManagerMetricStore = 
store.getTaskManagerMetricStore(taskmanager.getResourceIdString());
+                               if (taskManagerMetricStore != null) {
+                                       
taskmanagerStores.add(taskManagerMetricStore);
+                               }
+                       }
+                       return taskmanagerStores;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
new file mode 100644
index 0000000..dc701d9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+/**
+ * An interface for accumulating double values.
+ */
+interface DoubleAccumulator {
+
+       /**
+        * Adds the given value to this accumulator.
+        *
+        * @param value value to add
+        */
+       void add(double value);
+
+       /**
+        * Returns the current value of this accumulator.
+        *
+        * @return current value of this accumulator
+        */
+       double getValue();
+
+       /**
+        * Returns the name of this accumulator type. This name is used as a 
suffix for exposed metrics.
+        *
+        * @return name of this accumulator type
+        */
+       String getName();
+
+       /**
+        * A factory for {@link DoubleAccumulator}s. This allows us to 
regenerate a new set of accumulators for each metrics
+        * without re-evaluating the "agg" query parameter or re-using existing 
accumulators.
+        *
+        * @param <A> DoubleAccumulator subclass
+        */
+       interface DoubleAccumulatorFactory<A extends DoubleAccumulator> {
+               /**
+                * Creates a new accumulator with the given initial value.
+                *
+                * @param init initial value
+                * @return new accumulator with the given initial value
+                */
+               A get(double init);
+       }
+
+       /**
+        * Factory for {@link DoubleMaximum}.
+        */
+       final class DoubleMaximumFactory implements 
DoubleAccumulatorFactory<DoubleMaximum> {
+               private static final DoubleMaximumFactory INSTANCE = new 
DoubleMaximumFactory();
+
+               private DoubleMaximumFactory(){
+               }
+
+               @Override
+               public DoubleMaximum get(double init) {
+                       return new DoubleMaximum(init);
+               }
+
+               public static DoubleMaximumFactory get() {
+                       return INSTANCE;
+               }
+       }
+
+       /**
+        * Factory for {@link DoubleMinimum}.
+        */
+       final class DoubleMinimumFactory implements 
DoubleAccumulatorFactory<DoubleMinimum> {
+               private static final DoubleMinimumFactory INSTANCE = new 
DoubleMinimumFactory();
+
+               private DoubleMinimumFactory(){
+               }
+
+               @Override
+               public DoubleMinimum get(double init) {
+                       return new DoubleMinimum(init);
+               }
+
+               public static DoubleMinimumFactory get() {
+                       return INSTANCE;
+               }
+       }
+
+       /**
+        * Factory for {@link DoubleSum}.
+        */
+       final class DoubleSumFactory implements 
DoubleAccumulatorFactory<DoubleSum> {
+               private static final DoubleSumFactory INSTANCE = new 
DoubleSumFactory();
+
+               private DoubleSumFactory(){
+               }
+
+               @Override
+               public DoubleSum get(double init) {
+                       return new DoubleSum(init);
+               }
+
+               public static DoubleSumFactory get() {
+                       return INSTANCE;
+               }
+       }
+
+       /**
+        * Factory for {@link DoubleAverage}.
+        */
+       final class DoubleAverageFactory implements 
DoubleAccumulatorFactory<DoubleAverage> {
+               private static final DoubleAverageFactory INSTANCE = new 
DoubleAverageFactory();
+
+               private DoubleAverageFactory(){
+               }
+
+               @Override
+               public DoubleAverage get(double init) {
+                       return new DoubleAverage(init);
+               }
+
+               public static DoubleAverageFactory get() {
+                       return INSTANCE;
+               }
+       }
+
+       /**
+        * {@link DoubleAccumulator} that returns the maximum value.
+        */
+       final class DoubleMaximum implements DoubleAccumulator {
+
+               public static final String NAME = "max";
+
+               private double value;
+
+               private DoubleMaximum(double init) {
+                       value = init;
+               }
+
+               @Override
+               public void add(double value) {
+                       this.value = Math.max(this.value, value);
+               }
+
+               @Override
+               public double getValue() {
+                       return value;
+               }
+
+               @Override
+               public String getName() {
+                       return NAME;
+               }
+       }
+
+       /**
+        * {@link DoubleAccumulator} that returns the minimum value.
+        */
+       final class DoubleMinimum implements DoubleAccumulator {
+
+               public static final String NAME = "min";
+
+               private double value;
+
+               private DoubleMinimum(double init) {
+                       value = init;
+               }
+
+               @Override
+               public void add(double value) {
+                       this.value = Math.min(this.value, value);
+               }
+
+               @Override
+               public double getValue() {
+                       return value;
+               }
+
+               @Override
+               public String getName() {
+                       return NAME;
+               }
+       }
+
+       /**
+        * {@link DoubleAccumulator} that returns the sum of all values.
+        */
+       final class DoubleSum implements DoubleAccumulator {
+
+               public static final String NAME = "sum";
+
+               private double value;
+
+               private DoubleSum(double init) {
+                       value = init;
+               }
+
+               @Override
+               public void add(double value) {
+                       this.value += value;
+               }
+
+               @Override
+               public double getValue() {
+                       return value;
+               }
+
+               @Override
+               public String getName() {
+                       return NAME;
+               }
+       }
+
+       /**
+        * {@link DoubleAccumulator} that returns the average over all values.
+        */
+       final class DoubleAverage implements DoubleAccumulator {
+
+               public static final String NAME = "avg";
+
+               private double sum;
+               private int count;
+
+               private DoubleAverage(double init) {
+                       sum = init;
+                       count = 1;
+               }
+
+               @Override
+               public void add(double value) {
+                       this.sum += value;
+                       this.count++;
+               }
+
+               @Override
+               public double getValue() {
+                       return sum / count;
+               }
+
+               @Override
+               public String getName() {
+                       return NAME;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java
new file mode 100644
index 0000000..4100802
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Based {@link MessageHeaders} class for aggregating metrics.
+ */
+public abstract class AbstractAggregatedMetricsHeaders<P extends 
AbstractAggregatedMetricsParameters<?>> implements 
MessageHeaders<EmptyRequestBody, AggregatedMetricsResponseBody, P> {
+       @Override
+       public Class<AggregatedMetricsResponseBody> getResponseClass() {
+               return AggregatedMetricsResponseBody.class;
+       }
+
+       @Override
+       public HttpResponseStatus getResponseStatusCode() {
+               return HttpResponseStatus.OK;
+       }
+
+       @Override
+       public Class<EmptyRequestBody> getRequestClass() {
+               return EmptyRequestBody.class;
+       }
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.GET;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsParameters.java
new file mode 100644
index 0000000..a07141d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsParameters.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Base {@link MessageParameters} class for aggregating metrics.
+ */
+public abstract class AbstractAggregatedMetricsParameters<M extends 
MessageQueryParameter<?>> extends MessageParameters {
+       private final MetricsFilterParameter metrics = new 
MetricsFilterParameter();
+       private final MetricsAggregationParameter aggs = new 
MetricsAggregationParameter();
+       private final M selector;
+
+       AbstractAggregatedMetricsParameters(M selector) {
+               this.selector = selector;
+       }
+
+       @Override
+       public Collection<MessageQueryParameter<?>> getQueryParameters() {
+               return Collections.unmodifiableCollection(Arrays.asList(
+                       metrics,
+                       aggs,
+                       selector
+               ));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregateTaskManagerMetricsParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregateTaskManagerMetricsParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregateTaskManagerMetricsParameters.java
new file mode 100644
index 0000000..0a053e6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregateTaskManagerMetricsParameters.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for aggregating task manager metrics.
+ */
+public class AggregateTaskManagerMetricsParameters extends 
AbstractAggregatedMetricsParameters<TaskManagersFilterQueryParameter> {
+       public AggregateTaskManagerMetricsParameters() {
+               super(new TaskManagersFilterQueryParameter());
+       }
+
+       @Override
+       public Collection<MessagePathParameter<?>> getPathParameters() {
+               return Collections.emptyList();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsHeaders.java
new file mode 100644
index 0000000..265512e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsHeaders.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+/**
+ * Headers for aggregating job metrics.
+ */
+public class AggregatedJobMetricsHeaders extends 
AbstractAggregatedMetricsHeaders<AggregatedJobMetricsParameters> {
+
+       private static final AggregatedJobMetricsHeaders INSTANCE = new 
AggregatedJobMetricsHeaders();
+
+       private AggregatedJobMetricsHeaders() {
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return "/jobs/metrics";
+       }
+
+       @Override
+       public AggregatedJobMetricsParameters getUnresolvedMessageParameters() {
+               return new AggregatedJobMetricsParameters();
+       }
+
+       public static AggregatedJobMetricsHeaders getInstance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsParameters.java
new file mode 100644
index 0000000..25df609
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsParameters.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for aggregating job metrics.
+ */
+public class AggregatedJobMetricsParameters extends 
AbstractAggregatedMetricsParameters<JobsFilterQueryParameter> {
+
+       public AggregatedJobMetricsParameters() {
+               super(new JobsFilterQueryParameter());
+       }
+
+       @Override
+       public Collection<MessagePathParameter<?>> getPathParameters() {
+               return Collections.emptyList();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
new file mode 100644
index 0000000..acafc3a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Response type for aggregated metrics. Contains the metric name and 
optionally the sum, average, minimum and maximum.
+ */
+public class AggregatedMetric {
+
+       private static final String FIELD_NAME_ID = "id";
+
+       private static final String FIELD_NAME_MIN = "min";
+
+       private static final String FIELD_NAME_MAX = "max";
+
+       private static final String FIELD_NAME_AVG = "avg";
+
+       private static final String FIELD_NAME_SUM = "sum";
+
+       @JsonProperty(value = FIELD_NAME_ID, required = true)
+       private final String id;
+
+       @JsonInclude(JsonInclude.Include.NON_NULL)
+       @JsonProperty(FIELD_NAME_MIN)
+       private final Double min;
+
+       @JsonInclude(JsonInclude.Include.NON_NULL)
+       @JsonProperty(FIELD_NAME_MAX)
+       private final Double max;
+
+       @JsonInclude(JsonInclude.Include.NON_NULL)
+       @JsonProperty(FIELD_NAME_AVG)
+       private final Double avg;
+
+       @JsonInclude(JsonInclude.Include.NON_NULL)
+       @JsonProperty(FIELD_NAME_SUM)
+       private final Double sum;
+
+       @JsonCreator
+       public AggregatedMetric(
+               final @JsonProperty(value = FIELD_NAME_ID, required = true) 
String id,
+               final @Nullable @JsonProperty(FIELD_NAME_MIN) Double min,
+               final @Nullable @JsonProperty(FIELD_NAME_MAX) Double max,
+               final @Nullable @JsonProperty(FIELD_NAME_AVG) Double avg,
+               final @Nullable @JsonProperty(FIELD_NAME_SUM) Double sum) {
+
+               this.id = requireNonNull(id, "id must not be null");
+               this.min = min;
+               this.max = max;
+               this.avg = avg;
+               this.sum = sum;
+       }
+
+       public AggregatedMetric(final @JsonProperty(value = FIELD_NAME_ID, 
required = true) String id) {
+               this(id, null, null, null, null);
+       }
+
+       @JsonIgnore
+       public String getId() {
+               return id;
+       }
+
+       @JsonIgnore
+       public Double getMin() {
+               return min;
+       }
+
+       @JsonIgnore
+       public Double getMax() {
+               return max;
+       }
+
+       @JsonIgnore
+       public Double getSum() {
+               return sum;
+       }
+
+       @JsonIgnore
+       public Double getAvg() {
+               return avg;
+       }
+
+       @Override
+       public String toString() {
+               return "AggregatedMetric{" +
+                       "id='" + id + '\'' +
+                       ", mim='" + min + '\'' +
+                       ", max='" + max + '\'' +
+                       ", avg='" + avg + '\'' +
+                       ", sum='" + sum + '\'' +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetricsResponseBody.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetricsResponseBody.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetricsResponseBody.java
new file mode 100644
index 0000000..b6b8dcc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetricsResponseBody.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Response type for a collection of aggregated metrics.
+ *
+ * <p>As JSON this type will be represented as an array of
+ * metrics, i.e., the field <code>metrics</code> will not show up. For 
example, a collection with a
+ * single metric will be represented as follows:
+ * <pre>
+ * {@code
+ * [{"id": "metricName", "min": "1"}]
+ * }
+ * </pre>
+ *
+ * @see AggregatedMetricsResponseBody.Serializer
+ * @see AggregatedMetricsResponseBody.Deserializer
+ * @see org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore
+ */
+@JsonSerialize(using = AggregatedMetricsResponseBody.Serializer.class)
+@JsonDeserialize(using = AggregatedMetricsResponseBody.Deserializer.class)
+public class AggregatedMetricsResponseBody implements ResponseBody {
+
+       private final Collection<AggregatedMetric> metrics;
+
+       public AggregatedMetricsResponseBody(Collection<AggregatedMetric> 
metrics) {
+
+               this.metrics = metrics;
+       }
+
+       @JsonIgnore
+       public Collection<AggregatedMetric> getMetrics() {
+               return metrics;
+       }
+
+       /**
+        * JSON serializer for {@link AggregatedMetricsResponseBody}.
+        */
+       public static class Serializer extends 
StdSerializer<AggregatedMetricsResponseBody> {
+
+               private static final long serialVersionUID = 1L;
+
+               protected Serializer() {
+                       super(AggregatedMetricsResponseBody.class);
+               }
+
+               @Override
+               public void serialize(
+                       AggregatedMetricsResponseBody 
metricCollectionResponseBody,
+                       JsonGenerator jsonGenerator,
+                       SerializerProvider serializerProvider) throws 
IOException {
+
+                       
jsonGenerator.writeObject(metricCollectionResponseBody.getMetrics());
+               }
+       }
+
+       /**
+        * JSON deserializer for {@link AggregatedMetricsResponseBody}.
+        */
+       public static class Deserializer extends 
StdDeserializer<AggregatedMetricsResponseBody> {
+
+               private static final long serialVersionUID = 1L;
+
+               protected Deserializer() {
+                       super(AggregatedMetricsResponseBody.class);
+               }
+
+               @Override
+               public AggregatedMetricsResponseBody deserialize(
+                       JsonParser jsonParser,
+                       DeserializationContext deserializationContext) throws 
IOException {
+
+                       return new 
AggregatedMetricsResponseBody(jsonParser.readValueAs(
+                               new TypeReference<List<AggregatedMetric>>() {
+                               }));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
new file mode 100644
index 0000000..e1d0790
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+
+/**
+ * Headers for aggregating subtask metrics.
+ */
+public class AggregatedSubtaskMetricsHeaders extends 
AbstractAggregatedMetricsHeaders<AggregatedSubtaskMetricsParameters> {
+
+       private static final AggregatedSubtaskMetricsHeaders INSTANCE = new 
AggregatedSubtaskMetricsHeaders();
+
+       private AggregatedSubtaskMetricsHeaders() {
+       }
+
+       @Override
+       public AggregatedSubtaskMetricsParameters 
getUnresolvedMessageParameters() {
+               return new AggregatedSubtaskMetricsParameters();
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return "/jobs/" + JobIDPathParameter.KEY + "/vertices/" + 
JobVertexIdPathParameter.KEY + "/subtasks/metrics";
+       }
+
+       public static AggregatedSubtaskMetricsHeaders getInstance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java
new file mode 100644
index 0000000..34e1b52
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for aggregating subtask metrics.
+ */
+public class AggregatedSubtaskMetricsParameters extends 
AbstractAggregatedMetricsParameters<SubtasksFilterQueryParameter> {
+
+       private final JobIDPathParameter jobId = new JobIDPathParameter();
+       private final JobVertexIdPathParameter vertexId = new 
JobVertexIdPathParameter();
+       private final SubtaskIndexPathParameter subtaskIndex = new 
SubtaskIndexPathParameter();
+
+       public AggregatedSubtaskMetricsParameters() {
+               super(new SubtasksFilterQueryParameter());
+       }
+
+       @Override
+       public Collection<MessagePathParameter<?>> getPathParameters() {
+               return Collections.unmodifiableCollection(Arrays.asList(
+                       jobId,
+                       vertexId,
+                       subtaskIndex
+               ));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedTaskManagerMetricsHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedTaskManagerMetricsHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedTaskManagerMetricsHeaders.java
new file mode 100644
index 0000000..5b5fe4c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedTaskManagerMetricsHeaders.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+/**
+ * Headers for aggregating task manager metrics.
+ */
+public class AggregatedTaskManagerMetricsHeaders extends 
AbstractAggregatedMetricsHeaders<AggregateTaskManagerMetricsParameters> {
+
+       private static final AggregatedTaskManagerMetricsHeaders INSTANCE = new 
AggregatedTaskManagerMetricsHeaders();
+
+       private AggregatedTaskManagerMetricsHeaders() {
+       }
+
+       @Override
+       public AggregateTaskManagerMetricsParameters 
getUnresolvedMessageParameters() {
+               return new AggregateTaskManagerMetricsParameters();
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return "/taskmanagers/metrics";
+       }
+
+       public static AggregatedTaskManagerMetricsHeaders getInstance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobsFilterQueryParameter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobsFilterQueryParameter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobsFilterQueryParameter.java
new file mode 100644
index 0000000..fb57f87
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobsFilterQueryParameter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+
+/**
+ * {@link MessageQueryParameter} for selecting jobs when aggregating metrics.
+ */
+public class JobsFilterQueryParameter extends MessageQueryParameter<JobID> {
+
+       JobsFilterQueryParameter() {
+               super("jobs", MessageParameterRequisiteness.OPTIONAL);
+       }
+
+       @Override
+       public JobID convertStringToValue(String value) throws 
ConversionException {
+               try {
+                       return JobID.fromHexString(value);
+               } catch (IllegalArgumentException iae) {
+                       throw new ConversionException("Not a valid job ID: " + 
value, iae);
+               }
+       }
+
+       @Override
+       public String convertValueToString(JobID value) {
+               return value.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
new file mode 100644
index 0000000..1057788
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Locale;
+
+/**
+ * TODO: add javadoc.
+ */
+public class MetricsAggregationParameter extends 
MessageQueryParameter<MetricsAggregationParameter.AggregationMode> {
+
+       protected MetricsAggregationParameter() {
+               super("agg", MessageParameterRequisiteness.OPTIONAL);
+       }
+
+       @Override
+       public AggregationMode convertStringToValue(String value) throws 
ConversionException {
+               try {
+                       return 
AggregationMode.valueOf(value.toUpperCase(Locale.ROOT));
+               } catch (IllegalArgumentException iae) {
+                       throw new ConversionException("Not a valid aggregation: 
" + value, iae);
+               }
+       }
+
+       @Override
+       public String convertValueToString(AggregationMode value) {
+               return value.name().toLowerCase();
+       }
+
+       /**
+        * The available aggregations.
+        */
+       public enum AggregationMode {
+               MIN,
+               MAX,
+               SUM,
+               AVG
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtasksFilterQueryParameter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtasksFilterQueryParameter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtasksFilterQueryParameter.java
new file mode 100644
index 0000000..fe5d37e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtasksFilterQueryParameter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+/**
+ * {@link MessageQueryParameter} for selecting subtasks when aggregating 
metrics.
+ */
+public class SubtasksFilterQueryParameter extends 
MessageQueryParameter<String> {
+
+       SubtasksFilterQueryParameter() {
+               super("subtasks", MessageParameterRequisiteness.OPTIONAL);
+       }
+
+       @Override
+       public String convertStringToValue(String value) {
+               return value;
+       }
+
+       @Override
+       public String convertValueToString(String value) {
+               return value;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagersFilterQueryParameter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagersFilterQueryParameter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagersFilterQueryParameter.java
new file mode 100644
index 0000000..dcd6934
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagersFilterQueryParameter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+/**
+ * {@link MessageQueryParameter} for selecting task managers when aggregating 
metrics.
+ */
+public class TaskManagersFilterQueryParameter extends 
MessageQueryParameter<ResourceID> {
+
+       TaskManagersFilterQueryParameter() {
+               super("taskmanagers", MessageParameterRequisiteness.OPTIONAL);
+       }
+
+       @Override
+       public ResourceID convertStringToValue(String value) {
+               return new ResourceID(value);
+       }
+
+       @Override
+       public String convertValueToString(ResourceID value) {
+               return value.getResourceIdString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 7e552de..fb663ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -56,6 +56,9 @@ import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatistic
 import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
 import 
org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.metrics.AggregatingJobsMetricsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.metrics.AggregatingTaskManagersMetricsHandler;
 import 
org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.JobMetricsHandler;
 import 
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
@@ -393,6 +396,33 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                        responseHeaders,
                        metricFetcher);
 
+               final AggregatingTaskManagersMetricsHandler 
aggregatingTaskManagersMetricsHandler = new 
AggregatingTaskManagersMetricsHandler(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders,
+                       executor,
+                       metricFetcher
+               );
+
+               final AggregatingJobsMetricsHandler 
aggregatingJobsMetricsHandler = new AggregatingJobsMetricsHandler(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders,
+                       executor,
+                       metricFetcher
+               );
+
+               final AggregatingSubtasksMetricsHandler 
aggregatingSubtasksMetricsHandler = new AggregatingSubtasksMetricsHandler(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders,
+                       executor,
+                       metricFetcher
+               );
+
                final JobVertexTaskManagersHandler jobVertexTaskManagersHandler 
= new JobVertexTaskManagersHandler(
                        restAddressFuture,
                        leaderRetriever,
@@ -553,6 +583,9 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                handlers.add(Tuple2.of(SubtaskMetricsHeaders.getInstance(), 
subtaskMetricsHandler));
                handlers.add(Tuple2.of(TaskManagerMetricsHeaders.getInstance(), 
taskManagerMetricsHandler));
                handlers.add(Tuple2.of(JobManagerMetricsHeaders.getInstance(), 
jobManagerMetricsHandler));
+               
handlers.add(Tuple2.of(aggregatingTaskManagersMetricsHandler.getMessageHeaders(),
 aggregatingTaskManagersMetricsHandler));
+               
handlers.add(Tuple2.of(aggregatingJobsMetricsHandler.getMessageHeaders(), 
aggregatingJobsMetricsHandler));
+               
handlers.add(Tuple2.of(aggregatingSubtasksMetricsHandler.getMessageHeaders(), 
aggregatingSubtasksMetricsHandler));
                handlers.add(Tuple2.of(JobExecutionResultHeaders.getInstance(), 
jobExecutionResultHandler));
                handlers.add(Tuple2.of(SavepointTriggerHeaders.getInstance(), 
savepointTriggerHandler));
                handlers.add(Tuple2.of(SavepointStatusHeaders.getInstance(), 
savepointStatusHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
new file mode 100644
index 0000000..2dac8bf
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedJobMetricsParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Tests for the {@link AggregatingJobsMetricsHandler}.
+ */
+public class AggregatingJobsMetricsHandlerTest extends 
AggregatingMetricsHandlerTestBase<AggregatingJobsMetricsHandler, 
AggregatedJobMetricsParameters> {
+
+       private static final JobID JOB_ID_1 = JobID.generate();
+       private static final JobID JOB_ID_2 = JobID.generate();
+       private static final JobID JOB_ID_3 = JobID.generate();
+
+       @Override
+       protected Tuple2<String, List<String>> getFilter() {
+               return Tuple2.of("jobs", Arrays.asList(JOB_ID_1.toString(), 
JOB_ID_3.toString()));
+       }
+
+       @Override
+       protected Collection<MetricDump> getMetricDumps() {
+               Collection<MetricDump> dumps = new ArrayList<>(3);
+               QueryScopeInfo.JobQueryScopeInfo job = new 
QueryScopeInfo.JobQueryScopeInfo(JOB_ID_1.toString(), "abc");
+               MetricDump.CounterDump cd1 = new MetricDump.CounterDump(job, 
"metric1", 1);
+               dumps.add(cd1);
+
+               QueryScopeInfo.JobQueryScopeInfo job2 = new 
QueryScopeInfo.JobQueryScopeInfo(JOB_ID_2.toString(), "abc");
+               MetricDump.CounterDump cd2 = new MetricDump.CounterDump(job2, 
"metric1", 3);
+               dumps.add(cd2);
+
+               QueryScopeInfo.JobQueryScopeInfo job3 = new 
QueryScopeInfo.JobQueryScopeInfo(JOB_ID_3.toString(), "abc");
+               MetricDump.CounterDump cd3 = new MetricDump.CounterDump(job3, 
"metric2", 5);
+               dumps.add(cd3);
+               return dumps;
+       }
+
+       @Override
+       protected AggregatingJobsMetricsHandler 
getHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? 
extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> 
responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+               return new AggregatingJobsMetricsHandler(
+                       localRestAddress,
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders,
+                       executor,
+                       fetcher
+               );
+       }
+}

Reply via email to