http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
new file mode 100644
index 0000000..da115ee
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.files;
+
+/*****************************************************************************
+ * This code is based on the "HttpStaticFileServerHandler" from the
+ * Netty project's HTTP server example.
+ *
+ * See http://netty.io and
+ * 
https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
+ *****************************************************************************/
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.util.MimeTypes;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
+import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.Locale;
+import java.util.TimeZone;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.DATE;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.EXPIRES;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Simple file server handler that serves requests to web frontend's static 
files, such as
+ * HTML, CSS, or JS files.
+ *
+ * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty 
project's HTTP server
+ * example.</p>
+ */
+@ChannelHandler.Sharable
+public class StaticFileServerHandler<T extends RestfulGateway> extends 
RedirectHandler<T> {
+
+       /** Timezone in which this server answers its "if-modified" requests. */
+       private static final TimeZone GMT_TIMEZONE = 
TimeZone.getTimeZone("GMT");
+
+       /** Date format for HTTP. */
+       public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy 
HH:mm:ss zzz";
+
+       /** Be default, we allow files to be cached for 5 minutes. */
+       private static final int HTTP_CACHE_SECONDS = 300;
+
+       // 
------------------------------------------------------------------------
+
+       /** The path in which the static documents are. */
+       private final File rootPath;
+
+       public StaticFileServerHandler(
+                       GatewayRetriever<T> retriever,
+                       CompletableFuture<String> localJobManagerAddressFuture,
+                       Time timeout,
+                       File rootPath) throws IOException {
+
+               super(localJobManagerAddressFuture, retriever, timeout);
+
+               this.rootPath = checkNotNull(rootPath).getCanonicalFile();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Responses to requests
+       // 
------------------------------------------------------------------------
+
+       @Override
+       protected void respondAsLeader(ChannelHandlerContext 
channelHandlerContext, Routed routed, T gateway) throws Exception {
+               final HttpRequest request = routed.request();
+               final String requestPath;
+
+               // make sure we request the "index.html" in case there is a 
directory request
+               if (routed.path().endsWith("/")) {
+                       requestPath = routed.path() + "index.html";
+               }
+               // in case the files being accessed are logs or stdout files, 
find appropriate paths.
+               else if (routed.path().equals("/jobmanager/log") || 
routed.path().equals("/jobmanager/stdout")) {
+                       requestPath = "";
+               } else {
+                       requestPath = routed.path();
+               }
+
+               respondToRequest(channelHandlerContext, request, requestPath);
+       }
+
+       /**
+        * Response when running with leading JobManager.
+        */
+       private void respondToRequest(ChannelHandlerContext ctx, HttpRequest 
request, String requestPath)
+                       throws IOException, ParseException, URISyntaxException {
+
+               // convert to absolute path
+               final File file = new File(rootPath, requestPath);
+
+               if (!file.exists()) {
+                       // file does not exist. Try to load it with the 
classloader
+                       ClassLoader cl = 
StaticFileServerHandler.class.getClassLoader();
+
+                       try (InputStream resourceStream = 
cl.getResourceAsStream("web" + requestPath)) {
+                               boolean success = false;
+                               try {
+                                       if (resourceStream != null) {
+                                               URL root = 
cl.getResource("web");
+                                               URL requested = 
cl.getResource("web" + requestPath);
+
+                                               if (root != null && requested 
!= null) {
+                                                       URI rootURI = new 
URI(root.getPath()).normalize();
+                                                       URI requestedURI = new 
URI(requested.getPath()).normalize();
+
+                                                       // Check that we don't 
load anything from outside of the
+                                                       // expected scope.
+                                                       if 
(!rootURI.relativize(requestedURI).equals(requestedURI)) {
+                                                               
logger.debug("Loading missing file from classloader: {}", requestPath);
+                                                               // ensure that 
directory to file exists.
+                                                               
file.getParentFile().mkdirs();
+                                                               
Files.copy(resourceStream, file.toPath());
+
+                                                               success = true;
+                                                       }
+                                               }
+                                       }
+                               } catch (Throwable t) {
+                                       logger.error("error while responding", 
t);
+                               } finally {
+                                       if (!success) {
+                                               logger.debug("Unable to load 
requested file {} from classloader", requestPath);
+                                               sendError(ctx, NOT_FOUND);
+                                               return;
+                                       }
+                               }
+                       }
+               }
+
+               if (!file.exists() || file.isHidden() || file.isDirectory() || 
!file.isFile()) {
+                       sendError(ctx, NOT_FOUND);
+                       return;
+               }
+
+               if 
(!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) {
+                       sendError(ctx, NOT_FOUND);
+                       return;
+               }
+
+               // cache validation
+               final String ifModifiedSince = 
request.headers().get(IF_MODIFIED_SINCE);
+               if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) {
+                       SimpleDateFormat dateFormatter = new 
SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
+                       Date ifModifiedSinceDate = 
dateFormatter.parse(ifModifiedSince);
+
+                       // Only compare up to the second because the datetime 
format we send to the client
+                       // does not have milliseconds
+                       long ifModifiedSinceDateSeconds = 
ifModifiedSinceDate.getTime() / 1000;
+                       long fileLastModifiedSeconds = file.lastModified() / 
1000;
+                       if (ifModifiedSinceDateSeconds == 
fileLastModifiedSeconds) {
+                               if (logger.isDebugEnabled()) {
+                                       logger.debug("Responding 'NOT MODIFIED' 
for file '" + file.getAbsolutePath() + '\'');
+                               }
+
+                               sendNotModified(ctx);
+                               return;
+                       }
+               }
+
+               if (logger.isDebugEnabled()) {
+                       logger.debug("Responding with file '" + 
file.getAbsolutePath() + '\'');
+               }
+
+               // Don't need to close this manually. Netty's DefaultFileRegion 
will take care of it.
+               final RandomAccessFile raf;
+               try {
+                       raf = new RandomAccessFile(file, "r");
+               }
+               catch (FileNotFoundException e) {
+                       sendError(ctx, NOT_FOUND);
+                       return;
+               }
+
+               try {
+                       long fileLength = raf.length();
+
+                       HttpResponse response = new 
DefaultHttpResponse(HTTP_1_1, OK);
+                       setContentTypeHeader(response, file);
+
+                       // since the log and out files are rapidly changing, we 
don't want to browser to cache them
+                       if (!(requestPath.contains("log") || 
requestPath.contains("out"))) {
+                               setDateAndCacheHeaders(response, file);
+                       }
+                       if (HttpHeaders.isKeepAlive(request)) {
+                               response.headers().set(CONNECTION, 
HttpHeaders.Values.KEEP_ALIVE);
+                       }
+                       HttpHeaders.setContentLength(response, fileLength);
+
+                       // write the initial line and the header.
+                       ctx.write(response);
+
+                       // write the content.
+                       ChannelFuture lastContentFuture;
+                       if (ctx.pipeline().get(SslHandler.class) == null) {
+                               ctx.write(new 
DefaultFileRegion(raf.getChannel(), 0, fileLength), 
ctx.newProgressivePromise());
+                               lastContentFuture = 
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+                       } else {
+                               lastContentFuture = ctx.writeAndFlush(new 
HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
+                                       ctx.newProgressivePromise());
+                               // HttpChunkedInput will write the end marker 
(LastHttpContent) for us.
+                       }
+
+                       // close the connection, if no keep-alive is needed
+                       if (!HttpHeaders.isKeepAlive(request)) {
+                               
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+                       }
+               } catch (Exception e) {
+                       raf.close();
+                       logger.error("Failed to serve file.", e);
+                       sendError(ctx, INTERNAL_SERVER_ERROR);
+               }
+       }
+
+       @Override
+       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
{
+               if (ctx.channel().isActive()) {
+                       logger.error("Caught exception", cause);
+                       sendError(ctx, INTERNAL_SERVER_ERROR);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities to encode headers and responses
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Writes a simple  error response message.
+        *
+        * @param ctx    The channel context to write the response to.
+        * @param status The response status.
+        */
+       public static void sendError(ChannelHandlerContext ctx, 
HttpResponseStatus status) {
+               FullHttpResponse response = new DefaultFullHttpResponse(
+                               HTTP_1_1, status, 
Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
+               response.headers().set(CONTENT_TYPE, "text/plain; 
charset=UTF-8");
+
+               // close the connection as soon as the error message is sent.
+               
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+       }
+
+       /**
+        * Send the "304 Not Modified" response. This response can be used when 
the
+        * file timestamp is the same as what the browser is sending up.
+        *
+        * @param ctx The channel context to write the response to.
+        */
+       public static void sendNotModified(ChannelHandlerContext ctx) {
+               FullHttpResponse response = new 
DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED);
+               setDateHeader(response);
+
+               // close the connection as soon as the error message is sent.
+               
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+       }
+
+       /**
+        * Sets the "date" header for the HTTP response.
+        *
+        * @param response HTTP response
+        */
+       public static void setDateHeader(FullHttpResponse response) {
+               SimpleDateFormat dateFormatter = new 
SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
+               dateFormatter.setTimeZone(GMT_TIMEZONE);
+
+               Calendar time = new GregorianCalendar();
+               response.headers().set(DATE, 
dateFormatter.format(time.getTime()));
+       }
+
+       /**
+        * Sets the "date" and "cache" headers for the HTTP Response.
+        *
+        * @param response    The HTTP response object.
+        * @param fileToCache File to extract the modification timestamp from.
+        */
+       public static void setDateAndCacheHeaders(HttpResponse response, File 
fileToCache) {
+               SimpleDateFormat dateFormatter = new 
SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
+               dateFormatter.setTimeZone(GMT_TIMEZONE);
+
+               // date header
+               Calendar time = new GregorianCalendar();
+               response.headers().set(DATE, 
dateFormatter.format(time.getTime()));
+
+               // cache headers
+               time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
+               response.headers().set(EXPIRES, 
dateFormatter.format(time.getTime()));
+               response.headers().set(CACHE_CONTROL, "private, max-age=" + 
HTTP_CACHE_SECONDS);
+               response.headers().set(LAST_MODIFIED, dateFormatter.format(new 
Date(fileToCache.lastModified())));
+       }
+
+       /**
+        * Sets the content type header for the HTTP Response.
+        *
+        * @param response HTTP response
+        * @param file     file to extract content type
+        */
+       public static void setContentTypeHeader(HttpResponse response, File 
file) {
+               String mimeType = 
MimeTypes.getMimeTypeForFileName(file.getName());
+               String mimeFinal = mimeType != null ? mimeType : 
MimeTypes.getDefaultMimeType();
+               response.headers().set(CONTENT_TYPE, mimeFinal);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
new file mode 100644
index 0000000..315bdc2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Abstract request handler that returns a list of all available metrics or 
the values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate 
list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested 
metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public abstract class AbstractMetricsHandler extends 
AbstractJsonRequestHandler {
+       private final MetricFetcher fetcher;
+
+       public AbstractMetricsHandler(Executor executor, MetricFetcher fetcher) 
{
+               super(executor);
+               this.fetcher = Preconditions.checkNotNull(fetcher);
+       }
+
+       @Override
+       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               fetcher.update();
+                               String requestedMetricsList = 
queryParams.get("get");
+                               try {
+                                       return requestedMetricsList != null
+                                               ? getMetricsValues(pathParams, 
requestedMetricsList)
+                                               : 
getAvailableMetricsList(pathParams);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not retrieve metrics.", e);
+                               }
+                       },
+                       executor);
+
+       }
+
+       /**
+        * Returns a Map containing the metrics belonging to the entity pointed 
to by the path parameters.
+        *
+        * @param pathParams REST path parameters
+        * @param metrics MetricStore containing all metrics
+        * @return Map containing metrics, or null if no metric exists
+        */
+       protected abstract Map<String, String> getMapFor(Map<String, String> 
pathParams, MetricStore metrics);
+
+       private String getMetricsValues(Map<String, String> pathParams, String 
requestedMetricsList) throws IOException {
+               if (requestedMetricsList.isEmpty()) {
+                       /*
+                        * The WebInterface doesn't check whether the list of 
available metrics was empty. This can lead to a
+                        * request for which the "get" parameter is an empty 
string.
+                        */
+                       return "";
+               }
+               MetricStore metricStore = fetcher.getMetricStore();
+               synchronized (metricStore) {
+                       Map<String, String> metrics = getMapFor(pathParams, 
metricStore);
+                       if (metrics == null) {
+                               return "";
+                       }
+                       String[] requestedMetrics = 
requestedMetricsList.split(",");
+
+                       StringWriter writer = new StringWriter();
+                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+                       gen.writeStartArray();
+                       for (String requestedMetric : requestedMetrics) {
+                               Object metricValue = 
metrics.get(requestedMetric);
+                               if (metricValue != null) {
+                                       gen.writeStartObject();
+                                       gen.writeStringField("id", 
requestedMetric);
+                                       gen.writeStringField("value", 
metricValue.toString());
+                                       gen.writeEndObject();
+                               }
+                       }
+                       gen.writeEndArray();
+
+                       gen.close();
+                       return writer.toString();
+               }
+       }
+
+       private String getAvailableMetricsList(Map<String, String> pathParams) 
throws IOException {
+               MetricStore metricStore = fetcher.getMetricStore();
+               synchronized (metricStore) {
+                       Map<String, String> metrics = getMapFor(pathParams, 
metricStore);
+                       if (metrics == null) {
+                               return "";
+                       }
+                       StringWriter writer = new StringWriter();
+                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+                       gen.writeStartArray();
+                       for (String m : metrics.keySet()) {
+                               gen.writeStartObject();
+                               gen.writeStringField("id", m);
+                               gen.writeEndObject();
+                       }
+                       gen.writeEndArray();
+
+                       gen.close();
+                       return writer.toString();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
new file mode 100644
index 0000000..c568ee0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns for the job manager a list of all available 
metrics or the values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate 
list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested 
metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class JobManagerMetricsHandler extends AbstractMetricsHandler {
+
+       private static final String JOBMANAGER_METRICS_REST_PATH = 
"/jobmanager/metrics";
+
+       public JobManagerMetricsHandler(Executor executor, MetricFetcher 
fetcher) {
+               super(executor, fetcher);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{JOBMANAGER_METRICS_REST_PATH};
+       }
+
+       @Override
+       protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
+               MetricStore.JobManagerMetricStore jobManager = 
metrics.getJobManagerMetricStore();
+               if (jobManager == null) {
+                       return null;
+               } else {
+                       return jobManager.metrics;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
new file mode 100644
index 0000000..7341eb8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns for a given job a list of all available 
metrics or the values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate 
list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested 
metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class JobMetricsHandler extends AbstractMetricsHandler {
+       public static final String PARAMETER_JOB_ID = "jobid";
+       private static final String JOB_METRICS_REST_PATH = 
"/jobs/:jobid/metrics";
+
+       public JobMetricsHandler(Executor executor, MetricFetcher fetcher) {
+               super(executor, fetcher);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{JOB_METRICS_REST_PATH};
+       }
+
+       @Override
+       protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
+               MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
+               return job != null
+                       ? job.metrics
+                       : null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
new file mode 100644
index 0000000..3a701ab
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns for a given task a list of all available 
metrics or the values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate 
list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested 
metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class JobVertexMetricsHandler extends AbstractMetricsHandler {
+       public static final String PARAMETER_VERTEX_ID = "vertexid";
+       private static final String JOB_VERTEX_METRICS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/metrics";
+
+       public JobVertexMetricsHandler(Executor executor, MetricFetcher 
fetcher) {
+               super(executor, fetcher);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{JOB_VERTEX_METRICS_REST_PATH};
+       }
+
+       @Override
+       protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
+               MetricStore.TaskMetricStore task = metrics.getTaskMetricStore(
+                       pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID),
+                       pathParams.get(PARAMETER_VERTEX_ID));
+               return task != null
+                       ? task.metrics
+                       : null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
new file mode 100644
index 0000000..9f53808
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
+
+/**
+ * The MetricFetcher can be used to fetch metrics from the JobManager and all 
registered TaskManagers.
+ *
+ * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is 
called, provided that a sufficient time since
+ * the last call has passed.
+ */
+public class MetricFetcher {
+       private static final Logger LOG = 
LoggerFactory.getLogger(MetricFetcher.class);
+
+       private final GatewayRetriever<JobManagerGateway> retriever;
+       private final MetricQueryServiceRetriever queryServiceRetriever;
+       private final Executor executor;
+       private final Time timeout;
+
+       private final MetricStore metrics = new MetricStore();
+       private final MetricDumpDeserializer deserializer = new 
MetricDumpDeserializer();
+
+       private long lastUpdateTime;
+
+       public MetricFetcher(
+                       GatewayRetriever<JobManagerGateway> retriever,
+                       MetricQueryServiceRetriever queryServiceRetriever,
+                       Executor executor,
+                       Time timeout) {
+               this.retriever = Preconditions.checkNotNull(retriever);
+               this.queryServiceRetriever = 
Preconditions.checkNotNull(queryServiceRetriever);
+               this.executor = Preconditions.checkNotNull(executor);
+               this.timeout = Preconditions.checkNotNull(timeout);
+       }
+
+       /**
+        * Returns the MetricStore containing all stored metrics.
+        *
+        * @return MetricStore containing all stored metrics;
+        */
+       public MetricStore getMetricStore() {
+               return metrics;
+       }
+
+       /**
+        * This method can be used to signal this MetricFetcher that the 
metrics are still in use and should be updated.
+        */
+       public void update() {
+               synchronized (this) {
+                       long currentTime = System.currentTimeMillis();
+                       if (currentTime - lastUpdateTime > 10000) { // 10 
seconds have passed since the last update
+                               lastUpdateTime = currentTime;
+                               fetchMetrics();
+                       }
+               }
+       }
+
+       private void fetchMetrics() {
+               try {
+                       Optional<JobManagerGateway> optJobManagerGateway = 
retriever.getNow();
+                       if (optJobManagerGateway.isPresent()) {
+                               final JobManagerGateway jobManagerGateway = 
optJobManagerGateway.get();
+
+                               /**
+                                * Remove all metrics that belong to a job that 
is not running and no longer archived.
+                                */
+                               CompletableFuture<MultipleJobsDetails> 
jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
+
+                               jobDetailsFuture.whenCompleteAsync(
+                                       (MultipleJobsDetails jobDetails, 
Throwable throwable) -> {
+                                               if (throwable != null) {
+                                                       LOG.debug("Fetching of 
JobDetails failed.", throwable);
+                                               } else {
+                                                       ArrayList<String> 
toRetain = new ArrayList<>();
+                                                       for (JobDetails job : 
jobDetails.getRunningJobs()) {
+                                                               
toRetain.add(job.getJobId().toString());
+                                                       }
+                                                       for (JobDetails job : 
jobDetails.getFinishedJobs()) {
+                                                               
toRetain.add(job.getJobId().toString());
+                                                       }
+                                                       synchronized (metrics) {
+                                                               
metrics.jobs.keySet().retainAll(toRetain);
+                                                       }
+                                               }
+                                       },
+                                       executor);
+
+                               String jobManagerPath = 
jobManagerGateway.getAddress();
+                               String jmQueryServicePath = 
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME;
+
+                               retrieveAndQueryMetrics(jmQueryServicePath);
+
+                               /**
+                                * We first request the list of all registered 
task managers from the job manager, and then
+                                * request the respective metric dump from each 
task manager.
+                                *
+                                * <p>All stored metrics that do not belong to 
a registered task manager will be removed.
+                                */
+                               CompletableFuture<Collection<Instance>> 
taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+                               taskManagersFuture.whenCompleteAsync(
+                                       (Collection<Instance> taskManagers, 
Throwable throwable) -> {
+                                               if (throwable != null) {
+                                                       LOG.debug("Fetching 
list of registered TaskManagers failed.", throwable);
+                                               } else {
+                                                       List<String> 
activeTaskManagers = taskManagers.stream().map(
+                                                               
taskManagerInstance -> {
+                                                                       final 
String taskManagerAddress = 
taskManagerInstance.getTaskManagerGateway().getAddress();
+                                                                       final 
String tmQueryServicePath = taskManagerAddress.substring(0, 
taskManagerAddress.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + 
taskManagerInstance.getTaskManagerID().getResourceIdString();
+
+                                                                       
retrieveAndQueryMetrics(tmQueryServicePath);
+
+                                                                       return 
taskManagerInstance.getId().toString();
+                                                               
}).collect(Collectors.toList());
+
+                                                       synchronized (metrics) {
+                                                               
metrics.taskManagers.keySet().retainAll(activeTaskManagers);
+                                                       }
+                                               }
+                                       },
+                                       executor);
+                       }
+               } catch (Exception e) {
+                       LOG.warn("Exception while fetching metrics.", e);
+               }
+       }
+
+       /**
+        * Retrieves and queries the specified QueryServiceGateway.
+        *
+        * @param queryServicePath specifying the QueryServiceGateway
+        */
+       private void retrieveAndQueryMetrics(String queryServicePath) {
+               final CompletableFuture<MetricQueryServiceGateway> 
queryServiceGatewayFuture = 
queryServiceRetriever.retrieveService(queryServicePath);
+
+               queryServiceGatewayFuture.whenCompleteAsync(
+                       (MetricQueryServiceGateway queryServiceGateway, 
Throwable t) -> {
+                               if (t != null) {
+                                       LOG.debug("Could not retrieve 
QueryServiceGateway.", t);
+                               } else {
+                                       queryMetrics(queryServiceGateway);
+                               }
+                       },
+                       executor);
+       }
+
+       /**
+        * Query the metrics from the given QueryServiceGateway.
+        *
+        * @param queryServiceGateway to query for metrics
+        */
+       private void queryMetrics(final MetricQueryServiceGateway 
queryServiceGateway) {
+               queryServiceGateway
+                       .queryMetrics(timeout)
+                       .whenCompleteAsync(
+                               
(MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
+                                       if (t != null) {
+                                               LOG.debug("Fetching metrics 
failed.", t);
+                                       } else {
+                                               List<MetricDump> dumpedMetrics 
= deserializer.deserialize(result);
+                                               synchronized (metrics) {
+                                                       for (MetricDump metric 
: dumpedMetrics) {
+                                                               
metrics.add(metric);
+                                                       }
+                                               }
+                                       }
+                               },
+                               executor);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
new file mode 100644
index 0000000..6d3fc99
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER;
+import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE;
+import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM;
+import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
+
+/**
+ * Nested data-structure to store metrics.
+ *
+ * <p>This structure is not thread-safe.
+ */
+public class MetricStore {
+       private static final Logger LOG = 
LoggerFactory.getLogger(MetricStore.class);
+
+       final JobManagerMetricStore jobManager = new JobManagerMetricStore();
+       final Map<String, TaskManagerMetricStore> taskManagers = new 
HashMap<>();
+       final Map<String, JobMetricStore> jobs = new HashMap<>();
+
+       // 
-----------------------------------------------------------------------------------------------------------------
+       // Adding metrics
+       // 
-----------------------------------------------------------------------------------------------------------------
+       public void add(MetricDump metric) {
+               try {
+                       QueryScopeInfo info = metric.scopeInfo;
+                       TaskManagerMetricStore tm;
+                       JobMetricStore job;
+                       TaskMetricStore task;
+                       SubtaskMetricStore subtask;
+
+                       String name = info.scope.isEmpty()
+                               ? metric.name
+                               : info.scope + "." + metric.name;
+
+                       if (name.isEmpty()) { // malformed transmission
+                               return;
+                       }
+
+                       switch (info.getCategory()) {
+                               case INFO_CATEGORY_JM:
+                                       addMetric(jobManager.metrics, name, 
metric);
+                                       break;
+                               case INFO_CATEGORY_TM:
+                                       String tmID = 
((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
+                                       tm = taskManagers.get(tmID);
+                                       if (tm == null) {
+                                               tm = new 
TaskManagerMetricStore();
+                                               taskManagers.put(tmID, tm);
+                                       }
+                                       if (name.contains("GarbageCollector")) {
+                                               String gcName = 
name.substring("Status.JVM.GarbageCollector.".length(), name.lastIndexOf('.'));
+                                               
tm.addGarbageCollectorName(gcName);
+                                       }
+                                       addMetric(tm.metrics, name, metric);
+                                       break;
+                               case INFO_CATEGORY_JOB:
+                                       QueryScopeInfo.JobQueryScopeInfo 
jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
+                                       job = jobs.get(jobInfo.jobID);
+                                       if (job == null) {
+                                               job = new JobMetricStore();
+                                               jobs.put(jobInfo.jobID, job);
+                                       }
+                                       addMetric(job.metrics, name, metric);
+                                       break;
+                               case INFO_CATEGORY_TASK:
+                                       QueryScopeInfo.TaskQueryScopeInfo 
taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info;
+                                       job = jobs.get(taskInfo.jobID);
+                                       if (job == null) {
+                                               job = new JobMetricStore();
+                                               jobs.put(taskInfo.jobID, job);
+                                       }
+                                       task = job.tasks.get(taskInfo.vertexID);
+                                       if (task == null) {
+                                               task = new TaskMetricStore();
+                                               
job.tasks.put(taskInfo.vertexID, task);
+                                       }
+                                       subtask = 
task.subtasks.get(taskInfo.subtaskIndex);
+                                       if (subtask == null) {
+                                               subtask = new 
SubtaskMetricStore();
+                                               
task.subtasks.put(taskInfo.subtaskIndex, subtask);
+                                       }
+                                       /**
+                                        * The duplication is intended. Metrics 
scoped by subtask are useful for several job/task handlers,
+                                        * while the WebInterface task metric 
queries currently do not account for subtasks, so we don't
+                                        * divide by subtask and instead use 
the concatenation of subtask index and metric name as the name
+                                        * for those.
+                                        */
+                                       addMetric(subtask.metrics, name, 
metric);
+                                       addMetric(task.metrics, 
taskInfo.subtaskIndex + "." + name, metric);
+                                       break;
+                               case INFO_CATEGORY_OPERATOR:
+                                       QueryScopeInfo.OperatorQueryScopeInfo 
operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
+                                       job = jobs.get(operatorInfo.jobID);
+                                       if (job == null) {
+                                               job = new JobMetricStore();
+                                               jobs.put(operatorInfo.jobID, 
job);
+                                       }
+                                       task = 
job.tasks.get(operatorInfo.vertexID);
+                                       if (task == null) {
+                                               task = new TaskMetricStore();
+                                               
job.tasks.put(operatorInfo.vertexID, task);
+                                       }
+                                       /**
+                                        * As the WebInterface does not account 
for operators (because it can't) we don't
+                                        * divide by operator and instead use 
the concatenation of subtask index, operator name and metric name
+                                        * as the name.
+                                        */
+                                       addMetric(task.metrics, 
operatorInfo.subtaskIndex + "." + operatorInfo.operatorName + "." + name, 
metric);
+                                       break;
+                               default:
+                                       LOG.debug("Invalid metric dump 
category: " + info.getCategory());
+                       }
+               } catch (Exception e) {
+                       LOG.debug("Malformed metric dump.", e);
+               }
+       }
+
+       private void addMetric(Map<String, String> target, String name, 
MetricDump metric) {
+               switch (metric.getCategory()) {
+                       case METRIC_CATEGORY_COUNTER:
+                               MetricDump.CounterDump counter = 
(MetricDump.CounterDump) metric;
+                               target.put(name, String.valueOf(counter.count));
+                               break;
+                       case METRIC_CATEGORY_GAUGE:
+                               MetricDump.GaugeDump gauge = 
(MetricDump.GaugeDump) metric;
+                               target.put(name, gauge.value);
+                               break;
+                       case METRIC_CATEGORY_HISTOGRAM:
+                               MetricDump.HistogramDump histogram = 
(MetricDump.HistogramDump) metric;
+                               target.put(name + "_min", 
String.valueOf(histogram.min));
+                               target.put(name + "_max", 
String.valueOf(histogram.max));
+                               target.put(name + "_mean", 
String.valueOf(histogram.mean));
+                               target.put(name + "_median", 
String.valueOf(histogram.median));
+                               target.put(name + "_stddev", 
String.valueOf(histogram.stddev));
+                               target.put(name + "_p75", 
String.valueOf(histogram.p75));
+                               target.put(name + "_p90", 
String.valueOf(histogram.p90));
+                               target.put(name + "_p95", 
String.valueOf(histogram.p95));
+                               target.put(name + "_p98", 
String.valueOf(histogram.p98));
+                               target.put(name + "_p99", 
String.valueOf(histogram.p99));
+                               target.put(name + "_p999", 
String.valueOf(histogram.p999));
+                               break;
+                       case METRIC_CATEGORY_METER:
+                               MetricDump.MeterDump meter = 
(MetricDump.MeterDump) metric;
+                               target.put(name, String.valueOf(meter.rate));
+                               break;
+               }
+       }
+
+       // 
-----------------------------------------------------------------------------------------------------------------
+       // Accessors for sub MetricStores
+       // 
-----------------------------------------------------------------------------------------------------------------
+
+       /**
+        * Returns the {@link JobManagerMetricStore}.
+        *
+        * @return JobManagerMetricStore
+        */
+       public JobManagerMetricStore getJobManagerMetricStore() {
+               return jobManager;
+       }
+
+       /**
+        * Returns the {@link TaskManagerMetricStore} for the given taskmanager 
ID.
+        *
+        * @param tmID taskmanager ID
+        * @return TaskManagerMetricStore for the given ID, or null if no store 
for the given argument exists
+        */
+       public TaskManagerMetricStore getTaskManagerMetricStore(String tmID) {
+               return taskManagers.get(tmID);
+       }
+
+       /**
+        * Returns the {@link JobMetricStore} for the given job ID.
+        *
+        * @param jobID job ID
+        * @return JobMetricStore for the given ID, or null if no store for the 
given argument exists
+        */
+       public JobMetricStore getJobMetricStore(String jobID) {
+               return jobs.get(jobID);
+       }
+
+       /**
+        * Returns the {@link TaskMetricStore} for the given job/task ID.
+        *
+        * @param jobID  job ID
+        * @param taskID task ID
+        * @return TaskMetricStore for given IDs, or null if no store for the 
given arguments exists
+        */
+       public TaskMetricStore getTaskMetricStore(String jobID, String taskID) {
+               JobMetricStore job = getJobMetricStore(jobID);
+               if (job == null) {
+                       return null;
+               }
+               return job.getTaskMetricStore(taskID);
+       }
+
+       /**
+        * Returns the {@link SubtaskMetricStore} for the given job/task ID and 
subtask index.
+        *
+        * @param jobID        job ID
+        * @param taskID       task ID
+        * @param subtaskIndex subtask index
+        * @return SubtaskMetricStore for the given IDs and index, or null if 
no store for the given arguments exists
+        */
+       public SubtaskMetricStore getSubtaskMetricStore(String jobID, String 
taskID, int subtaskIndex) {
+               TaskMetricStore task = getTaskMetricStore(jobID, taskID);
+               if (task == null) {
+                       return null;
+               }
+               return task.getSubtaskMetricStore(subtaskIndex);
+       }
+
+       // 
-----------------------------------------------------------------------------------------------------------------
+       // sub MetricStore classes
+       // 
-----------------------------------------------------------------------------------------------------------------
+       private abstract static class ComponentMetricStore {
+               public final Map<String, String> metrics = new HashMap<>();
+
+               public String getMetric(String name, String defaultValue) {
+                       String value = this.metrics.get(name);
+                       return value != null
+                               ? value
+                               : defaultValue;
+               }
+       }
+
+       /**
+        * Sub-structure containing metrics of the JobManager.
+        */
+       public static class JobManagerMetricStore extends ComponentMetricStore {
+       }
+
+       /**
+        * Sub-structure containing metrics of a single TaskManager.
+        */
+       public static class TaskManagerMetricStore extends ComponentMetricStore 
{
+               public final Set<String> garbageCollectorNames = new 
HashSet<>();
+
+               public void addGarbageCollectorName(String name) {
+                       garbageCollectorNames.add(name);
+               }
+       }
+
+       /**
+        * Sub-structure containing metrics of a single Job.
+        */
+       public static class JobMetricStore extends ComponentMetricStore {
+               private final Map<String, TaskMetricStore> tasks = new 
HashMap<>();
+
+               public TaskMetricStore getTaskMetricStore(String taskID) {
+                       return tasks.get(taskID);
+               }
+       }
+
+       /**
+        * Sub-structure containing metrics of a single Task.
+        */
+       public static class TaskMetricStore extends ComponentMetricStore {
+               private final Map<Integer, SubtaskMetricStore> subtasks = new 
HashMap<>();
+
+               public SubtaskMetricStore getSubtaskMetricStore(int 
subtaskIndex) {
+                       return subtasks.get(subtaskIndex);
+               }
+       }
+
+       /**
+        * Sub-structure containing metrics of a single Subtask.
+        */
+       public static class SubtaskMetricStore extends ComponentMetricStore {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
new file mode 100644
index 0000000..90bafb7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns for a given task manager a list of all 
available metrics or the values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate 
list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested 
metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class TaskManagerMetricsHandler extends AbstractMetricsHandler {
+
+       private static final String TASKMANAGER_METRICS_REST_PATH = 
"/taskmanagers/:taskmanagerid/metrics";
+
+       public TaskManagerMetricsHandler(Executor executor, MetricFetcher 
fetcher) {
+               super(executor, fetcher);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{TASKMANAGER_METRICS_REST_PATH};
+       }
+
+       @Override
+       protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
+               MetricStore.TaskManagerMetricStore taskManager = 
metrics.getTaskManagerMetricStore(pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY));
+               if (taskManager == null) {
+                       return null;
+               } else {
+                       return taskManager.metrics;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
new file mode 100644
index 0000000..e2aaaf7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.util;
+
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.rest.handler.legacy.JobVertexDetailsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * This class is a mutable version of the {@link IOMetrics} class that allows 
adding up IO-related metrics.
+ *
+ * <p>For finished jobs these metrics are stored in the {@link ExecutionGraph} 
as another {@link IOMetrics}.
+ * For running jobs these metrics are retrieved using the {@link 
MetricFetcher}.
+ *
+ * <p>This class provides a common interface to handle both cases, reducing 
complexity in various handlers (like
+ * the {@link JobVertexDetailsHandler}).
+ */
+public class MutableIOMetrics extends IOMetrics {
+
+       private static final long serialVersionUID = -5460777634971381737L;
+
+       public MutableIOMetrics() {
+               super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D);
+       }
+
+       /**
+        * Adds the IO metrics for the given attempt to this object. If the 
{@link AccessExecution} is in
+        * a terminal state the contained {@link IOMetrics} object is added. 
Otherwise the given {@link MetricFetcher} is
+        * used to retrieve the required metrics.
+        *
+        * @param attempt Attempt whose IO metrics should be added
+        * @param fetcher MetricFetcher to retrieve metrics for running jobs
+        * @param jobID JobID to which the attempt belongs
+        * @param taskID TaskID to which the attempt belongs
+        */
+       public void addIOMetrics(AccessExecution attempt, @Nullable 
MetricFetcher fetcher, String jobID, String taskID) {
+               if (attempt.getState().isTerminal()) {
+                       IOMetrics ioMetrics = attempt.getIOMetrics();
+                       if (ioMetrics != null) { // execAttempt is already 
finished, use final metrics stored in ExecutionGraph
+                               this.numBytesInLocal += 
ioMetrics.getNumBytesInLocal();
+                               this.numBytesInRemote += 
ioMetrics.getNumBytesInRemote();
+                               this.numBytesOut += ioMetrics.getNumBytesOut();
+                               this.numRecordsIn += 
ioMetrics.getNumRecordsIn();
+                               this.numRecordsOut += 
ioMetrics.getNumRecordsOut();
+                       }
+               } else { // execAttempt is still running, use 
MetricQueryService instead
+                       if (fetcher != null) {
+                               fetcher.update();
+                               MetricStore.SubtaskMetricStore metrics = 
fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, 
attempt.getParallelSubtaskIndex());
+                               if (metrics != null) {
+                                       this.numBytesInLocal += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"));
+                                       this.numBytesInRemote += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+                                       this.numBytesOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+                                       this.numRecordsIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+                                       this.numRecordsOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Writes the IO metrics contained in this object to the given {@link 
JsonGenerator}.
+        *
+        * <p>The JSON structure written is as follows:
+        * "metrics": {
+        *     "read-bytes": 1,
+        *     "write-bytes": 2,
+        *     "read-records": 3,
+        *     "write-records": 4
+        * }
+        *
+        * @param gen JsonGenerator to which the metrics should be written
+        * @throws IOException
+        */
+       public void writeIOMetricsAsJson(JsonGenerator gen) throws IOException {
+               gen.writeObjectFieldStart("metrics");
+               gen.writeNumberField("read-bytes", this.numBytesInLocal + 
this.numBytesInRemote);
+               gen.writeNumberField("write-bytes", this.numBytesOut);
+               gen.writeNumberField("read-records", this.numRecordsIn);
+               gen.writeNumberField("write-records", this.numRecordsOut);
+               gen.writeEndObject();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandlerTest.java
new file mode 100644
index 0000000..5bfa1f9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandlerTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the ClusterOverviewHandler.
+ */
+public class ClusterOverviewHandlerTest {
+       @Test
+       public void testGetPaths() {
+               ClusterOverviewHandler handler = new 
ClusterOverviewHandler(Executors.directExecutor(), Time.seconds(0L));
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               Assert.assertEquals("/overview", paths[0]);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandlerTest.java
new file mode 100644
index 0000000..0ada30d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandlerTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the CurrentJobIdsHandler.
+ */
+public class CurrentJobIdsHandlerTest {
+       @Test
+       public void testGetPaths() {
+               CurrentJobIdsHandler handler = new 
CurrentJobIdsHandler(Executors.directExecutor(), Time.seconds(0L));
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               Assert.assertEquals("/jobs", paths[0]);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
new file mode 100644
index 0000000..83bb157
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+
+/**
+ * Tests for the CurrentJobsOverviewHandler.
+ */
+public class CurrentJobsOverviewHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               JobDetails expectedDetails = 
WebMonitorUtils.createDetailsForJob(originalJob);
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/joboverview", archive.getPath());
+
+               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(archive.getJson());
+               ArrayNode running = (ArrayNode) result.get("running");
+               Assert.assertEquals(0, running.size());
+
+               ArrayNode finished = (ArrayNode) result.get("finished");
+               Assert.assertEquals(1, finished.size());
+
+               compareJobOverview(expectedDetails, finished.get(0).toString());
+       }
+
+       @Test
+       public void testGetPaths() {
+               CurrentJobsOverviewHandler handlerAll = new 
CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, 
true);
+               String[] pathsAll = handlerAll.getPaths();
+               Assert.assertEquals(1, pathsAll.length);
+               Assert.assertEquals("/joboverview", pathsAll[0]);
+
+               CurrentJobsOverviewHandler handlerRunning = new 
CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, 
false);
+               String[] pathsRunning = handlerRunning.getPaths();
+               Assert.assertEquals(1, pathsRunning.length);
+               Assert.assertEquals("/joboverview/running", pathsRunning[0]);
+
+               CurrentJobsOverviewHandler handlerCompleted = new 
CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), false, 
true);
+               String[] pathsCompleted = handlerCompleted.getPaths();
+               Assert.assertEquals(1, pathsCompleted.length);
+               Assert.assertEquals("/joboverview/completed", 
pathsCompleted[0]);
+       }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               JobDetails expectedDetails = 
WebMonitorUtils.createDetailsForJob(originalJob);
+               StringWriter writer = new StringWriter();
+               try (JsonGenerator gen = 
ArchivedJobGenerationUtils.JACKSON_FACTORY.createGenerator(writer)) {
+                       
CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(expectedDetails, gen, 
0);
+               }
+               compareJobOverview(expectedDetails, writer.toString());
+       }
+
+       private static void compareJobOverview(JobDetails expectedDetails, 
String answer) throws IOException {
+               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(answer);
+
+               Assert.assertEquals(expectedDetails.getJobId().toString(), 
result.get("jid").asText());
+               Assert.assertEquals(expectedDetails.getJobName(), 
result.get("name").asText());
+               Assert.assertEquals(expectedDetails.getStatus().name(), 
result.get("state").asText());
+
+               Assert.assertEquals(expectedDetails.getStartTime(), 
result.get("start-time").asLong());
+               Assert.assertEquals(expectedDetails.getEndTime(), 
result.get("end-time").asLong());
+               Assert.assertEquals(expectedDetails.getEndTime() - 
expectedDetails.getStartTime(), result.get("duration").asLong());
+               Assert.assertEquals(expectedDetails.getLastUpdateTime(), 
result.get("last-modification").asLong());
+
+               JsonNode tasks = result.get("tasks");
+               Assert.assertEquals(expectedDetails.getNumTasks(), 
tasks.get("total").asInt());
+               int[] tasksPerState = 
expectedDetails.getNumVerticesPerExecutionState();
+               Assert.assertEquals(
+                       tasksPerState[ExecutionState.CREATED.ordinal()] + 
tasksPerState[ExecutionState.SCHEDULED.ordinal()] + 
tasksPerState[ExecutionState.DEPLOYING.ordinal()],
+                       tasks.get("pending").asInt());
+               
Assert.assertEquals(tasksPerState[ExecutionState.RUNNING.ordinal()], 
tasks.get("running").asInt());
+               
Assert.assertEquals(tasksPerState[ExecutionState.FINISHED.ordinal()], 
tasks.get("finished").asInt());
+               
Assert.assertEquals(tasksPerState[ExecutionState.CANCELING.ordinal()], 
tasks.get("canceling").asInt());
+               
Assert.assertEquals(tasksPerState[ExecutionState.CANCELED.ordinal()], 
tasks.get("canceled").asInt());
+               
Assert.assertEquals(tasksPerState[ExecutionState.FAILED.ordinal()], 
tasks.get("failed").asInt());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
new file mode 100644
index 0000000..06a99fe
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.TimeZone;
+
+/**
+ * Tests for the DashboardConfigHandler.
+ */
+public class DashboardConfigHandlerTest {
+       @Test
+       public void testGetPaths() {
+               DashboardConfigHandler handler = new 
DashboardConfigHandler(Executors.directExecutor(), 10000L);
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               Assert.assertEquals("/config", paths[0]);
+       }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               long refreshInterval = 12345;
+               TimeZone timeZone = TimeZone.getDefault();
+               EnvironmentInformation.RevisionInformation revision = 
EnvironmentInformation.getRevisionInformation();
+
+               String json = 
DashboardConfigHandler.createConfigJson(refreshInterval);
+
+               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+               Assert.assertEquals(refreshInterval, 
result.get("refresh-interval").asLong());
+               Assert.assertEquals(timeZone.getDisplayName(), 
result.get("timezone-name").asText());
+               Assert.assertEquals(timeZone.getRawOffset(), 
result.get("timezone-offset").asLong());
+               Assert.assertEquals(EnvironmentInformation.getVersion(), 
result.get("flink-version").asText());
+               Assert.assertEquals(revision.commitId + " @ " + 
revision.commitDate, result.get("flink-revision").asText());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/HandlerRedirectUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/HandlerRedirectUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/HandlerRedirectUtilsTest.java
new file mode 100644
index 0000000..7e96835
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/HandlerRedirectUtilsTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the HandlerRedirectUtils.
+ */
+public class HandlerRedirectUtilsTest extends TestLogger {
+
+       private static final String localRestAddress = "http://127.0.0.1:1234";;
+       private static final String remoteRestAddress = "http://127.0.0.2:1234";;
+
+       @Test
+       public void testGetRedirectAddressWithLocalEqualsRemoteRESTAddress() 
throws Exception {
+               JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
+               
when(jobManagerGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(localRestAddress));
+
+               CompletableFuture<Optional<String>> redirectingAddressFuture = 
HandlerRedirectUtils.getRedirectAddress(
+                       localRestAddress,
+                       jobManagerGateway,
+                       Time.seconds(3L));
+
+               Assert.assertTrue(redirectingAddressFuture.isDone());
+               // no redirection needed
+               Assert.assertFalse(redirectingAddressFuture.get().isPresent());
+       }
+
+       @Test
+       public void testGetRedirectAddressWithRemoteAkkaPath() throws Exception 
{
+               JobManagerGateway jobManagerGateway = 
mock(AkkaJobManagerGateway.class);
+               
when(jobManagerGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(remoteRestAddress));
+
+               CompletableFuture<Optional<String>> optRedirectingAddress = 
HandlerRedirectUtils.getRedirectAddress(
+                       localRestAddress,
+                       jobManagerGateway,
+                       Time.seconds(3L));
+
+               Assert.assertTrue(optRedirectingAddress.isDone());
+
+               Assert.assertEquals(remoteRestAddress, 
optRedirectingAddress.get().get());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
new file mode 100644
index 0000000..e1736c1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobAccumulatorsHandler.
+ */
+public class JobAccumulatorsHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
JobAccumulatorsHandler.JobAccumulatorsJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/accumulators", archive.getPath());
+               compareAccumulators(originalJob, archive.getJson());
+       }
+
+       @Test
+       public void testGetPaths() {
+               JobAccumulatorsHandler handler = new 
JobAccumulatorsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]);
+       }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               String json = 
JobAccumulatorsHandler.createJobAccumulatorsJson(originalJob);
+
+               compareAccumulators(originalJob, json);
+       }
+
+       private static void compareAccumulators(AccessExecutionGraph 
originalJob, String json) throws IOException {
+               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+               ArrayNode accs = (ArrayNode) result.get("job-accumulators");
+               Assert.assertEquals(0, accs.size());
+
+               
Assert.assertTrue(originalJob.getAccumulatorResultsStringified().length > 0);
+               ArchivedJobGenerationUtils.compareStringifiedAccumulators(
+                       originalJob.getAccumulatorResultsStringified(),
+                       (ArrayNode) result.get("user-task-accumulators"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandlerTest.java
new file mode 100644
index 0000000..cab8835
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandlerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Tests for the JobCancellationHandler.
+ */
+public class JobCancellationHandlerTest {
+       @Test
+       public void testGetPaths() {
+               JobCancellationHandler handler = new 
JobCancellationHandler(Executors.directExecutor(), TestingUtils.TIMEOUT());
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(2, paths.length);
+               List<String> pathsList = Lists.newArrayList(paths);
+               Assert.assertTrue(pathsList.contains("/jobs/:jobid/cancel"));
+               
Assert.assertTrue(pathsList.contains("/jobs/:jobid/yarn-cancel"));
+       }
+}

Reply via email to