http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index ec8516d..745a110 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -47,10 +47,10 @@ public class JarUploadHandler extends 
AbstractJsonRequestHandler {
                                Map<String, String> pathParams,
                                Map<String, String> queryParams,
                                ActorGateway jobManager) throws Exception {
-               
+
                String tempFilePath = queryParams.get("filepath");
                String filename = queryParams.get("filename");
-               
+
                File tempFile;
                if (tempFilePath != null && (tempFile = new 
File(tempFilePath)).exists()) {
                        if (!tempFile.getName().endsWith(".jar")) {
@@ -58,7 +58,7 @@ public class JarUploadHandler extends 
AbstractJsonRequestHandler {
                                tempFile.delete();
                                return "{\"error\": \"Only Jar files are 
allowed.\"}";
                        }
-                       
+
                        String filenameWithUUID = UUID.randomUUID() + "_" + 
filename;
                        File newFile = new File(jarDir, filenameWithUUID);
                        if (tempFile.renameTo(newFile)) {
@@ -70,7 +70,7 @@ public class JarUploadHandler extends 
AbstractJsonRequestHandler {
                                tempFile.delete();
                        }
                }
-               
+
                return "{\"error\": \"Failed to upload the file.\"}";
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
index c403aa2..163e583 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Collection;
@@ -37,7 +38,7 @@ import java.util.Map;
 public class JobAccumulatorsHandler extends 
AbstractExecutionGraphRequestHandler {
 
        private static final String JOB_ACCUMULATORS_REST_PATH = 
"/jobs/:jobid/accumulators";
-       
+
        public JobAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder) {
                super(executionGraphHolder);
        }
@@ -52,6 +53,9 @@ public class JobAccumulatorsHandler extends 
AbstractExecutionGraphRequestHandler
                return createJobAccumulatorsJson(graph);
        }
 
+       /**
+        * Archivist for the JobAccumulatorsHandler.
+        */
        public static class JobAccumulatorsJsonArchivist implements 
JsonArchivist {
 
                @Override
@@ -65,7 +69,7 @@ public class JobAccumulatorsHandler extends 
AbstractExecutionGraphRequestHandler
 
        public static String createJobAccumulatorsJson(AccessExecutionGraph 
graph) throws IOException {
                StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
                StringifiedAccumulatorResult[] allAccumulators = 
graph.getAccumulatorResultsStringified();
 
@@ -74,7 +78,7 @@ public class JobAccumulatorsHandler extends 
AbstractExecutionGraphRequestHandler
                gen.writeArrayFieldStart("job-accumulators");
                // empty for now
                gen.writeEndArray();
-               
+
                gen.writeArrayFieldStart("user-task-accumulators");
                for (StringifiedAccumulatorResult acc : allAccumulators) {
                        gen.writeStartObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
index f5d6853..3f7b824 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import akka.dispatch.OnComplete;
-import com.fasterxml.jackson.core.JsonGenerator;
-import io.netty.buffer.Unpooled;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.FullHttpResponse;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpVersion;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
@@ -36,11 +28,18 @@ import 
org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoi
 import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+
+import akka.dispatch.OnComplete;
+import com.fasterxml.jackson.core.JsonGenerator;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.StringWriter;
 import java.nio.charset.Charset;
@@ -48,6 +47,10 @@ import java.util.ArrayDeque;
 import java.util.HashMap;
 import java.util.Map;
 
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -67,13 +70,13 @@ public class JobCancellationWithSavepointHandlers {
        /** Shared lock between Trigger and In-Progress handlers. */
        private final Object lock = new Object();
 
-       /** In-Progress requests */
+       /** In-Progress requests. */
        private final Map<JobID, Long> inProgress = new HashMap<>();
 
        /** Succeeded/failed request. Either String or Throwable. */
        private final Map<Long, Object> completed = new HashMap<>();
 
-       /** Atomic request counter */
+       /** Atomic request counter. */
        private long requestCounter;
 
        /** Handler for trigger requests. */
@@ -244,7 +247,7 @@ public class JobCancellationWithSavepointHandlers {
 
                        // Accepted response
                        StringWriter writer = new StringWriter();
-                       JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
                        gen.writeStartObject();
                        gen.writeStringField("status", "accepted");
                        gen.writeNumberField("request-id", requestId);
@@ -283,7 +286,7 @@ public class JobCancellationWithSavepointHandlers {
                /** The number of recent checkpoints whose IDs are remembered. 
*/
                private static final int NUM_GHOST_REQUEST_IDS = 16;
 
-               /** Remember some recently completed */
+               /** Remember some recently completed. */
                private final ArrayDeque<Tuple2<Long, Object>> 
recentlyCompleted = new ArrayDeque<>(NUM_GHOST_REQUEST_IDS);
 
                @Override
@@ -324,7 +327,7 @@ public class JobCancellationWithSavepointHandlers {
                                                                if 
(inProgressRequestId == requestId) {
                                                                        return 
createInProgressResponse(requestId);
                                                                } else {
-                                                                       String 
msg= "Request ID does not belong to JobID";
+                                                                       String 
msg = "Request ID does not belong to JobID";
                                                                        return 
createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
                                                                }
                                                        }
@@ -355,7 +358,7 @@ public class JobCancellationWithSavepointHandlers {
 
                private FullHttpResponse createSuccessResponse(long requestId, 
String savepointPath) throws IOException {
                        StringWriter writer = new StringWriter();
-                       JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
                        gen.writeStartObject();
 
                        gen.writeStringField("status", "success");
@@ -381,7 +384,7 @@ public class JobCancellationWithSavepointHandlers {
 
                private FullHttpResponse createInProgressResponse(long 
requestId) throws IOException {
                        StringWriter writer = new StringWriter();
-                       JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
                        gen.writeStartObject();
 
                        gen.writeStringField("status", "in-progress");
@@ -406,7 +409,7 @@ public class JobCancellationWithSavepointHandlers {
 
                private FullHttpResponse 
createFailureResponse(HttpResponseStatus code, long requestId, String errMsg) 
throws IOException {
                        StringWriter writer = new StringWriter();
-                       JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
                        gen.writeStartObject();
 
                        gen.writeStringField("status", "failed");

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 2b96456..72cf8b7 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -18,19 +18,19 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Map;
-
-import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Map;
 
 /**
  * Request handler that returns the execution config of a job.
@@ -53,6 +53,9 @@ public class JobConfigHandler extends 
AbstractExecutionGraphRequestHandler {
                return createJobConfigJson(graph);
        }
 
+       /**
+        * Archivist for the JobConfigHandler.
+        */
        public static class JobConfigJsonArchivist implements JsonArchivist {
 
                @Override
@@ -66,7 +69,7 @@ public class JobConfigHandler extends 
AbstractExecutionGraphRequestHandler {
 
        public static String createJobConfigJson(AccessExecutionGraph graph) 
throws IOException {
                StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
                gen.writeStartObject();
                gen.writeStringField("jid", graph.getJobID().toString());
@@ -86,7 +89,7 @@ public class JobConfigHandler extends 
AbstractExecutionGraphRequestHandler {
                        Map<String, String> ucVals = 
summary.getGlobalJobParameters();
                        if (ucVals != null) {
                                gen.writeObjectFieldStart("user-config");
-                               
+
                                for (Map.Entry<String, String> ucVal : 
ucVals.entrySet()) {
                                        gen.writeStringField(ucVal.getKey(), 
ucVal.getValue());
                                }
@@ -97,7 +100,7 @@ public class JobConfigHandler extends 
AbstractExecutionGraphRequestHandler {
                        gen.writeEndObject();
                }
                gen.writeEndObject();
-               
+
                gen.close();
                return writer.toString();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 37a1c19..87ac7c3 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import com.fasterxml.jackson.core.JsonGenerator;
-
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -32,7 +30,10 @@ import 
org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
@@ -40,7 +41,7 @@ import java.util.Collection;
 import java.util.Map;
 
 /**
- * Request handler that returns details about a job, including:
+ * Request handler that returns details about a job. This includes:
  * <ul>
  *     <li>Dataflow plan</li>
  *     <li>id, name, and current status</li>
@@ -71,6 +72,9 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
                return createJobDetailsJson(graph, fetcher);
        }
 
+       /**
+        * Archivist for the JobDetailsHandler.
+        */
        public static class JobDetailsJsonArchivist implements JsonArchivist {
 
                @Override
@@ -89,18 +93,18 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
 
        public static String createJobDetailsJson(AccessExecutionGraph graph, 
@Nullable MetricFetcher fetcher) throws IOException {
                final StringWriter writer = new StringWriter();
-               final JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+               final JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
                final long now = System.currentTimeMillis();
-               
+
                gen.writeStartObject();
-               
+
                // basic info
                gen.writeStringField("jid", graph.getJobID().toString());
                gen.writeStringField("name", graph.getJobName());
                gen.writeBooleanField("isStoppable", graph.isStoppable());
                gen.writeStringField("state", graph.getState().name());
-               
+
                // times and duration
                final long jobStartTime = 
graph.getStatusTimestamp(JobStatus.CREATED);
                final long jobEndTime = 
graph.getState().isGloballyTerminalState() ?
@@ -109,14 +113,14 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
                gen.writeNumberField("end-time", jobEndTime);
                gen.writeNumberField("duration", (jobEndTime > 0 ? jobEndTime : 
now) - jobStartTime);
                gen.writeNumberField("now", now);
-               
+
                // timestamps
                gen.writeObjectFieldStart("timestamps");
                for (JobStatus status : JobStatus.values()) {
                        gen.writeNumberField(status.name(), 
graph.getStatusTimestamp(status));
                }
                gen.writeEndObject();
-               
+
                // job vertices
                int[] jobVerticesPerState = new 
int[ExecutionState.values().length];
                gen.writeArrayFieldStart("vertices");
@@ -126,7 +130,7 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
                        long startTime = Long.MAX_VALUE;
                        long endTime = 0;
                        boolean allFinished = true;
-                       
+
                        for (AccessExecutionVertex vertex : 
ejv.getTaskVertices()) {
                                final ExecutionState state = 
vertex.getExecutionState();
                                tasksPerState[state.ordinal()]++;
@@ -136,11 +140,11 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
                                if (started > 0) {
                                        startTime = Math.min(startTime, 
started);
                                }
-                               
+
                                allFinished &= state.isTerminal();
                                endTime = Math.max(endTime, 
vertex.getStateTimestamp(state));
                        }
-                       
+
                        long duration;
                        if (startTime < Long.MAX_VALUE) {
                                if (allFinished) {
@@ -156,8 +160,8 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
                                endTime = -1L;
                                duration = -1L;
                        }
-                       
-                       ExecutionState jobVertexState = 
+
+                       ExecutionState jobVertexState =
                                        
ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, 
ejv.getParallelism());
                        jobVerticesPerState[jobVertexState.ordinal()]++;
 
@@ -170,13 +174,13 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
                        gen.writeNumberField("start-time", startTime);
                        gen.writeNumberField("end-time", endTime);
                        gen.writeNumberField("duration", duration);
-                       
+
                        gen.writeObjectFieldStart("tasks");
                        for (ExecutionState state : ExecutionState.values()) {
                                gen.writeNumberField(state.name(), 
tasksPerState[state.ordinal()]);
                        }
                        gen.writeEndObject();
-                       
+
                        MutableIOMetrics counts = new MutableIOMetrics();
 
                        for (AccessExecutionVertex vertex : 
ejv.getTaskVertices()) {
@@ -188,7 +192,7 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
                        }
 
                        counts.writeIOMetricsAsJson(gen);
-                       
+
                        gen.writeEndObject();
                }
                gen.writeEndArray();

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index 81cdc83..181b270 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -27,6 +26,8 @@ import 
org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.ExceptionUtils;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Collection;
@@ -41,7 +42,7 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphRequestHandler {
        private static final String JOB_EXCEPTIONS_REST_PATH = 
"/jobs/:jobid/exceptions";
 
        static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
-       
+
        public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {
                super(executionGraphHolder);
        }
@@ -56,6 +57,9 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphRequestHandler {
                return createJobExceptionsJson(graph);
        }
 
+       /**
+        * Archivist for the JobExceptionsHandler.
+        */
        public static class JobExceptionsJsonArchivist implements JsonArchivist 
{
 
                @Override
@@ -69,10 +73,10 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphRequestHandler {
 
        public static String createJobExceptionsJson(AccessExecutionGraph 
graph) throws IOException {
                StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
                gen.writeStartObject();
-               
+
                // most important is the root failure cause
                String rootException = graph.getFailureCauseAsString();
                if (rootException != null && 
!rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
@@ -84,7 +88,7 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphRequestHandler {
 
                int numExceptionsSoFar = 0;
                boolean truncated = false;
-               
+
                for (AccessExecutionVertex task : 
graph.getAllExecutionVertices()) {
                        String t = task.getFailureCauseAsString();
                        if (t != null && 
!t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index 5fcf010..d1aeea4 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.instance.ActorGateway;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -46,7 +47,7 @@ public class JobManagerConfigHandler extends 
AbstractJsonRequestHandler {
        @Override
        public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
                gen.writeStartArray();
                for (String key : config.keySet()) {
@@ -54,9 +55,9 @@ public class JobManagerConfigHandler extends 
AbstractJsonRequestHandler {
                        gen.writeStringField("key", key);
 
                        // Mask key values which contain sensitive information
-                       if(key.toLowerCase().contains("password")) {
+                       if (key.toLowerCase().contains("password")) {
                                String value = config.getString(key, null);
-                               if(value != null) {
+                               if (value != null) {
                                        value = "******";
                                }
                                gen.writeStringField("value", value);

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
index 885d04e..d17b6bb 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
@@ -34,7 +34,7 @@ import java.util.Map;
 public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
 
        private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan";
-       
+
        public JobPlanHandler(ExecutionGraphHolder executionGraphHolder) {
                super(executionGraphHolder);
        }
@@ -49,6 +49,9 @@ public class JobPlanHandler extends 
AbstractExecutionGraphRequestHandler {
                return graph.getJsonPlan();
        }
 
+       /**
+        * Archivist for the JobPlanHandler.
+        */
        public static class JobPlanJsonArchivist implements JsonArchivist {
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index 2532a1e..8e90dfc 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import com.fasterxml.jackson.core.JsonGenerator;
-
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -27,6 +25,8 @@ import 
org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
@@ -34,11 +34,13 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-
+/**
+ * Request handler that returns the accummulators for a given vertex.
+ */
 public class JobVertexAccumulatorsHandler extends 
AbstractJobVertexRequestHandler {
 
        private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/accumulators";
-       
+
        public JobVertexAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder) {
                super(executionGraphHolder);
        }
@@ -53,6 +55,9 @@ public class JobVertexAccumulatorsHandler extends 
AbstractJobVertexRequestHandle
                return createVertexAccumulatorsJson(jobVertex);
        }
 
+       /**
+        * Archivist for JobVertexAccumulatorsHandler.
+        */
        public static class JobVertexAccumulatorsJsonArchivist implements 
JsonArchivist {
 
                @Override
@@ -71,13 +76,13 @@ public class JobVertexAccumulatorsHandler extends 
AbstractJobVertexRequestHandle
 
        public static String 
createVertexAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws 
IOException {
                StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
                StringifiedAccumulatorResult[] accs = 
jobVertex.getAggregatedUserAccumulatorsStringified();
 
                gen.writeStartObject();
                gen.writeStringField("id", 
jobVertex.getJobVertexId().toString());
-               
+
                gen.writeArrayFieldStart("user-accumulators");
                for (StringifiedAccumulatorResult acc : accs) {
                        gen.writeStartObject();
@@ -87,7 +92,7 @@ public class JobVertexAccumulatorsHandler extends 
AbstractJobVertexRequestHandle
                        gen.writeEndObject();
                }
                gen.writeEndArray();
-               
+
                gen.writeEndObject();
 
                gen.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
index 52167e1..cde8ca9 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
@@ -18,18 +18,20 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats;
-import scala.Option;
+
+import com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.StringWriter;
 import java.util.Map;
 
+import scala.Option;
+
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -72,7 +74,7 @@ public class JobVertexBackPressureHandler extends 
AbstractJobVertexRequestHandle
                }
                ExecutionJobVertex jobVertex = (ExecutionJobVertex) 
accessJobVertex;
                try (StringWriter writer = new StringWriter();
-                               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer)) {
+                               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer)) {
 
                        gen.writeStartObject();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index d9a1131..7757fdd 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import com.fasterxml.jackson.core.JsonGenerator;
-
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -31,7 +29,10 @@ import 
org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
@@ -45,7 +46,7 @@ import java.util.Map;
  */
 public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 
-       private static String JOB_VERTEX_DETAILS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid";
+       private static final String JOB_VERTEX_DETAILS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid";
 
        private final MetricFetcher fetcher;
 
@@ -64,6 +65,9 @@ public class JobVertexDetailsHandler extends 
AbstractJobVertexRequestHandler {
                return createVertexDetailsJson(jobVertex, params.get("jobid"), 
fetcher);
        }
 
+       /**
+        * Archivist for the JobVertexDetailsHandler.
+        */
        public static class JobVertexDetailsJsonArchivist implements 
JsonArchivist {
 
                @Override
@@ -85,9 +89,9 @@ public class JobVertexDetailsHandler extends 
AbstractJobVertexRequestHandler {
                        String jobID,
                        @Nullable MetricFetcher fetcher) throws IOException {
                final long now = System.currentTimeMillis();
-               
+
                StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
                gen.writeStartObject();
 
@@ -100,7 +104,7 @@ public class JobVertexDetailsHandler extends 
AbstractJobVertexRequestHandler {
                int num = 0;
                for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
                        final ExecutionState status = 
vertex.getExecutionState();
-                       
+
                        TaskManagerLocation location = 
vertex.getCurrentAssignedResourceLocation();
                        String locationString = location == null ? 
"(unassigned)" : location.getHostname() + ":" + location.dataPort();
 
@@ -110,7 +114,7 @@ public class JobVertexDetailsHandler extends 
AbstractJobVertexRequestHandler {
                        }
                        long endTime = status.isTerminal() ? 
vertex.getStateTimestamp(status) : -1;
                        long duration = startTime > 0 ? ((endTime > 0 ? endTime 
: now) - startTime) : -1;
-                       
+
                        gen.writeStartObject();
                        gen.writeNumberField("subtask", num);
                        gen.writeStringField("status", status.name());
@@ -130,13 +134,13 @@ public class JobVertexDetailsHandler extends 
AbstractJobVertexRequestHandler {
                        );
 
                        counts.writeIOMetricsAsJson(gen);
-                       
+
                        gen.writeEndObject();
-                       
+
                        num++;
                }
                gen.writeEndArray();
-               
+
                gen.writeEndObject();
 
                gen.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index 3878722..a612782 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -31,7 +30,10 @@ import 
org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
@@ -65,6 +67,9 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
                return createVertexDetailsByTaskManagerJson(jobVertex, 
params.get("jobid"), fetcher);
        }
 
+       /**
+        * Archivist for JobVertexTaskManagersHandler.
+        */
        public static class JobVertexTaskManagersJsonArchivist implements 
JsonArchivist {
 
                @Override
@@ -86,7 +91,7 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
                        String jobID,
                        @Nullable MetricFetcher fetcher) throws IOException {
                StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
                // Build a map that groups tasks by TaskManager
                Map<String, List<AccessExecutionVertex>> taskManagerVertices = 
new HashMap<>();
@@ -108,7 +113,6 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
                // Build JSON response
                final long now = System.currentTimeMillis();
 
-
                gen.writeStartObject();
 
                gen.writeStringField("id", 
jobVertex.getJobVertexId().toString());

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
index e886532..4ce0baf 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
@@ -26,11 +26,10 @@ package org.apache.flink.runtime.webmonitor.handlers;
 public class JsonFactory {
 
        /** The singleton Jackson JSON factory. */
-       public static final com.fasterxml.jackson.core.JsonFactory 
jacksonFactory =
+       public static final com.fasterxml.jackson.core.JsonFactory 
JACKSON_FACTORY =
                        new com.fasterxml.jackson.core.JsonFactory();
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
-       /** Don't instantiate */
+
        private JsonFactory() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
index b6246e6..66c30af 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import io.netty.handler.codec.http.FullHttpResponse;
 import org.apache.flink.runtime.instance.ActorGateway;
 
+import io.netty.handler.codec.http.FullHttpResponse;
+
 import java.util.Map;
 
 /**
@@ -36,13 +37,13 @@ public interface RequestHandler {
         * respond with a full http response, including content-type, 
content-length, etc.
         *
         * <p>Exceptions may be throws and will be handled.
-        * 
+        *
         * @param pathParams The map of REST path parameters, decoded by the 
router.
         * @param queryParams The map of query parameters.
         * @param jobManager The JobManager actor.
         *
         * @return The full http response.
-        * 
+        *
         * @throws Exception Handlers may forward exceptions. Exceptions of type
         *         {@link 
org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404
         *         response with the exception message, other exceptions will 
cause a HTTP 500 response

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
index 4cf5f0f..28e9ddf 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
@@ -30,7 +30,7 @@ import java.util.Map;
 public class SubtaskCurrentAttemptDetailsHandler extends 
SubtaskExecutionAttemptDetailsHandler {
 
        public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum";
-       
+
        public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, MetricFetcher fetcher) {
                super(executionGraphHolder, fetcher);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 9026a22..171277f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -28,6 +27,8 @@ import 
org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
@@ -37,12 +38,12 @@ import java.util.Map;
 
 /**
  * Base class for request handlers whose response depends on a specific job 
vertex (defined
- * via the "vertexid" parameter) in a specific job, defined via (defined voa 
the "jobid" parameter).  
+ * via the "vertexid" parameter) in a specific job, defined via (defined voa 
the "jobid" parameter).
  */
 public class SubtaskExecutionAttemptAccumulatorsHandler extends 
AbstractSubtaskAttemptRequestHandler {
 
        private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators";
-       
+
        public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder) {
                super(executionGraphHolder);
        }
@@ -56,7 +57,10 @@ public class SubtaskExecutionAttemptAccumulatorsHandler 
extends AbstractSubtaskA
        public String handleRequest(AccessExecution execAttempt, Map<String, 
String> params) throws Exception {
                return createAttemptAccumulatorsJson(execAttempt);
        }
-               
+
+       /**
+        * Archivist for the SubtaskExecutionAttemptAccumulatorsHandler.
+        */
        public static class SubtaskExecutionAttemptAccumulatorsJsonArchivist 
implements JsonArchivist {
 
                @Override
@@ -91,8 +95,8 @@ public class SubtaskExecutionAttemptAccumulatorsHandler 
extends AbstractSubtaskA
 
        public static String createAttemptAccumulatorsJson(AccessExecution 
execAttempt) throws IOException {
                StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
-               
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
                final StringifiedAccumulatorResult[] accs = 
execAttempt.getUserAccumulatorsStringified();
 
                gen.writeStartObject();
@@ -100,7 +104,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler 
extends AbstractSubtaskA
                gen.writeNumberField("subtask", 
execAttempt.getParallelSubtaskIndex());
                gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
                gen.writeStringField("id", 
execAttempt.getAttemptId().toString());
-               
+
                gen.writeArrayFieldStart("user-accumulators");
                for (StringifiedAccumulatorResult acc : accs) {
                        gen.writeStartObject();
@@ -110,9 +114,9 @@ public class SubtaskExecutionAttemptAccumulatorsHandler 
extends AbstractSubtaskA
                        gen.writeEndObject();
                }
                gen.writeEndArray();
-               
+
                gen.writeEndObject();
-               
+
                gen.close();
                return writer.toString();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index 078f54a..37c0e50 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import com.fasterxml.jackson.core.JsonGenerator;
-
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -32,7 +30,10 @@ import 
org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
@@ -66,6 +67,9 @@ public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemp
                return createAttemptDetailsJson(execAttempt, 
params.get("jobid"), params.get("vertexid"), fetcher);
        }
 
+       /**
+        * Archivist for the SubtaskExecutionAttemptDetailsHandler.
+        */
        public static class SubtaskExecutionAttemptDetailsJsonArchivist 
implements JsonArchivist {
 
                @Override
@@ -83,7 +87,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemp
                                                .replace(":vertexid", 
task.getJobVertexId().toString())
                                                .replace(":subtasknum", 
String.valueOf(subtask.getParallelSubtaskIndex()))
                                                .replace(":attempt", 
String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber()));
-                                       
+
                                        archive.add(new 
ArchivedJson(curAttemptPath1, curAttemptJson));
                                        archive.add(new 
ArchivedJson(curAttemptPath2, curAttemptJson));
 
@@ -109,7 +113,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemp
                        String vertexID,
                        @Nullable MetricFetcher fetcher) throws IOException {
                StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
                final ExecutionState status = execAttempt.getState();
                final long now = System.currentTimeMillis();
@@ -141,7 +145,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemp
                        jobID,
                        vertexID
                );
-               
+
                counts.writeIOMetricsAsJson(gen);
 
                gen.writeEndObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 6c3bc18..64bdfb4 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import com.fasterxml.jackson.core.JsonGenerator;
-
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -29,6 +27,8 @@ import 
org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
@@ -42,7 +42,7 @@ import java.util.Map;
 public class SubtasksAllAccumulatorsHandler extends 
AbstractJobVertexRequestHandler {
 
        private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH =       
"/jobs/:jobid/vertices/:vertexid/subtasks/accumulators";
-       
+
        public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder) {
                super(executionGraphHolder);
        }
@@ -57,6 +57,9 @@ public class SubtasksAllAccumulatorsHandler extends 
AbstractJobVertexRequestHand
                return createSubtasksAccumulatorsJson(jobVertex);
        }
 
+       /**
+        * Archivist for the SubtasksAllAccumulatorsHandler.
+        */
        public static class SubtasksAllAccumulatorsJsonArchivist implements 
JsonArchivist {
 
                @Override
@@ -75,22 +78,22 @@ public class SubtasksAllAccumulatorsHandler extends 
AbstractJobVertexRequestHand
 
        public static String 
createSubtasksAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws 
IOException {
                StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
                gen.writeStartObject();
                gen.writeStringField("id", 
jobVertex.getJobVertexId().toString());
                gen.writeNumberField("parallelism", jobVertex.getParallelism());
 
                gen.writeArrayFieldStart("subtasks");
-               
+
                int num = 0;
                for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
 
                        TaskManagerLocation location = 
vertex.getCurrentAssignedResourceLocation();
                        String locationString = location == null ? 
"(unassigned)" : location.getHostname();
-                       
+
                        gen.writeStartObject();
-                       
+
                        gen.writeNumberField("subtask", num++);
                        gen.writeNumberField("attempt", 
vertex.getCurrentExecutionAttempt().getAttemptNumber());
                        gen.writeStringField("host", locationString);
@@ -105,7 +108,7 @@ public class SubtasksAllAccumulatorsHandler extends 
AbstractJobVertexRequestHand
                                gen.writeEndObject();
                        }
                        gen.writeEndArray();
-                       
+
                        gen.writeEndObject();
                }
                gen.writeEndArray();

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index adefa80..ea88587 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import com.fasterxml.jackson.core.JsonGenerator;
-
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -29,6 +27,8 @@ import 
org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
@@ -43,7 +43,7 @@ import java.util.Map;
 public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
 
        private static final String SUBTASK_TIMES_REST_PATH =   
"/jobs/:jobid/vertices/:vertexid/subtasktimes";
-       
+
        public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder) {
                super(executionGraphHolder);
        }
@@ -58,6 +58,9 @@ public class SubtasksTimesHandler extends 
AbstractJobVertexRequestHandler {
                return createSubtaskTimesJson(jobVertex);
        }
 
+       /**
+        * Archivist for the SubtasksTimesHandler.
+        */
        public static class SubtasksTimesJsonArchivist implements JsonArchivist 
{
 
                @Override
@@ -78,28 +81,28 @@ public class SubtasksTimesHandler extends 
AbstractJobVertexRequestHandler {
                final long now = System.currentTimeMillis();
 
                StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
                gen.writeStartObject();
 
                gen.writeStringField("id", 
jobVertex.getJobVertexId().toString());
                gen.writeStringField("name", jobVertex.getName());
                gen.writeNumberField("now", now);
-               
+
                gen.writeArrayFieldStart("subtasks");
 
                int num = 0;
                for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
-                       
+
                        long[] timestamps = 
vertex.getCurrentExecutionAttempt().getStateTimestamps();
                        ExecutionState status = vertex.getExecutionState();
 
                        long scheduledTime = 
timestamps[ExecutionState.SCHEDULED.ordinal()];
-                       
+
                        long start = scheduledTime > 0 ? scheduledTime : -1;
                        long end = status.isTerminal() ? 
timestamps[status.ordinal()] : now;
                        long duration = start >= 0 ? end - start : -1L;
-                       
+
                        gen.writeStartObject();
                        gen.writeNumberField("subtask", num++);
 
@@ -108,13 +111,13 @@ public class SubtasksTimesHandler extends 
AbstractJobVertexRequestHandler {
                        gen.writeStringField("host", locationString);
 
                        gen.writeNumberField("duration", duration);
-                       
+
                        gen.writeObjectFieldStart("timestamps");
                        for (ExecutionState state : ExecutionState.values()) {
                                gen.writeNumberField(state.name(), 
timestamps[state.ordinal()]);
                        }
                        gen.writeEndObject();
-                       
+
                        gen.writeEndObject();
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 53ee336..1084623 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -26,24 +26,6 @@ package org.apache.flink.runtime.webmonitor.handlers;
  * 
https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
  *****************************************************************************/
 
-import akka.dispatch.Mapper;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.DefaultFileRegion;
-import io.netty.handler.codec.http.DefaultHttpResponse;
-import io.netty.handler.codec.http.HttpChunkedInput;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.HttpResponse;
-import io.netty.handler.codec.http.LastHttpContent;
-import io.netty.handler.codec.http.router.Routed;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.handler.stream.ChunkedFile;
-import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
@@ -65,12 +47,27 @@ import 
org.apache.flink.runtime.webmonitor.JobManagerRetriever;
 import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
+
+import akka.dispatch.Mapper;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.HttpChunkedInput;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.router.Routed;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedFile;
+import io.netty.util.concurrent.GenericFutureListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -81,6 +78,11 @@ import java.nio.channels.FileChannel;
 import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
 
+import scala.Option;
+import scala.concurrent.ExecutionContextExecutor;
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
 import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
 import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
 import static io.netty.handler.codec.http.HttpResponseStatus.OK;
@@ -100,19 +102,19 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
        private static final String TASKMANAGER_LOG_REST_PATH = 
"/taskmanagers/:taskmanagerid/log";
        private static final String TASKMANAGER_OUT_REST_PATH = 
"/taskmanagers/:taskmanagerid/stdout";
 
-       /** Keep track of last transmitted log, to clean up old ones */
+       /** Keep track of last transmitted log, to clean up old ones. */
        private final HashMap<String, BlobKey> lastSubmittedLog = new 
HashMap<>();
        private final HashMap<String, BlobKey> lastSubmittedStdout = new 
HashMap<>();
 
-       /** Keep track of request status, prevents multiple log requests for a 
single TM running concurrently */
+       /** Keep track of request status, prevents multiple log requests for a 
single TM running concurrently. */
        private final ConcurrentHashMap<String, Boolean> lastRequestPending = 
new ConcurrentHashMap<>();
        private final Configuration config;
 
-       /** Future of the blob cache */
+       /** Future of the blob cache. */
        private Future<BlobCache> cache;
 
-       /** Indicates which log file should be displayed; true indicates .log, 
false indicates .out */
-       private boolean serveLogFile;
+       /** Indicates which log file should be displayed. */
+       private FileMode fileMode;
 
        private final ExecutionContextExecutor executor;
 
@@ -120,6 +122,7 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
 
        private final BlobView blobView;
 
+       /** Used to control whether this handler serves the .log or .out file. 
*/
        public enum FileMode {
                LOG,
                STDOUT
@@ -138,14 +141,7 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
 
                this.executor = checkNotNull(executor);
                this.config = config;
-               switch (fileMode) {
-                       case LOG:
-                               serveLogFile = true;
-                               break;
-                       case STDOUT:
-                               serveLogFile = false;
-                               break;
-               }
+               this.fileMode = fileMode;
 
                this.blobView = Preconditions.checkNotNull(blobView, 
"blobView");
 
@@ -154,10 +150,12 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
 
        @Override
        public String[] getPaths() {
-               if (serveLogFile) {
-                       return new String[]{TASKMANAGER_LOG_REST_PATH};
-               } else {
-                       return new String[]{TASKMANAGER_OUT_REST_PATH};
+               switch (fileMode) {
+                       case LOG:
+                               return new String[]{TASKMANAGER_LOG_REST_PATH};
+                       case STDOUT:
+                       default:
+                               return new String[]{TASKMANAGER_OUT_REST_PATH};
                }
        }
 
@@ -199,10 +197,12 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
                                        public Future<BlobKey> 
apply(JobManagerMessages.TaskManagerInstance value) {
                                                Instance taskManager = 
value.instance().get();
 
-                                               if (serveLogFile) {
-                                                       return 
taskManager.getTaskManagerGateway().requestTaskManagerLog(timeTimeout);
-                                               } else {
-                                                       return 
taskManager.getTaskManagerGateway().requestTaskManagerStdout(timeTimeout);
+                                               switch (fileMode) {
+                                                       case LOG:
+                                                               return 
taskManager.getTaskManagerGateway().requestTaskManagerLog(timeTimeout);
+                                                       case STDOUT:
+                                                       default:
+                                                               return 
taskManager.getTaskManagerGateway().requestTaskManagerStdout(timeTimeout);
                                                }
                                        }
                                });
@@ -223,7 +223,7 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
                                                        final BlobCache 
blobCache = value.f1;
 
                                                        //delete previous log 
file, if it is different than the current one
-                                                       HashMap<String, 
BlobKey> lastSubmittedFile = serveLogFile ? lastSubmittedLog : 
lastSubmittedStdout;
+                                                       HashMap<String, 
BlobKey> lastSubmittedFile = fileMode == FileMode.LOG ? lastSubmittedLog : 
lastSubmittedStdout;
                                                        if 
(lastSubmittedFile.containsKey(taskManagerID)) {
                                                                if 
(!blobKey.equals(lastSubmittedFile.get(taskManagerID))) {
                                                                        try {

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index a23e983..6ad490e 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -28,28 +27,34 @@ import 
org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
 import org.apache.flink.util.StringUtils;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+
+import com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
 import static java.util.Objects.requireNonNull;
 
+/**
+ * A request handler that provides an overview over all taskmanagers or 
details for a single one.
+ */
 public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 
        private static final String TASKMANAGERS_REST_PATH = "/taskmanagers";
        private static final String TASKMANAGER_DETAILS_REST_PATH = 
"/taskmanagers/:taskmanagerid";
 
        public static final String TASK_MANAGER_ID_KEY = "taskmanagerid";
-       
+
        private final FiniteDuration timeout;
 
        private final MetricFetcher fetcher;
-       
+
        public TaskManagersHandler(FiniteDuration timeout, MetricFetcher 
fetcher) {
                this.timeout = requireNonNull(timeout);
                this.fetcher = fetcher;
@@ -88,7 +93,7 @@ public class TaskManagersHandler extends 
AbstractJsonRequestHandler  {
                                }
 
                                StringWriter writer = new StringWriter();
-                               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+                               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
                                gen.writeStartObject();
                                gen.writeArrayFieldStart("taskmanagers");
@@ -112,17 +117,17 @@ public class TaskManagersHandler extends 
AbstractJsonRequestHandler  {
                                                
MetricStore.TaskManagerMetricStore metrics = 
fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
                                                if (metrics != null) {
                                                        
gen.writeObjectFieldStart("metrics");
-                                                       long heapUsed = 
Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
-                                                       long heapCommitted = 
Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
-                                                       long heapTotal = 
Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+                                                       long heapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+                                                       long heapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+                                                       long heapTotal = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
 
                                                        
gen.writeNumberField("heapCommitted", heapCommitted);
                                                        
gen.writeNumberField("heapUsed", heapUsed);
                                                        
gen.writeNumberField("heapMax", heapTotal);
 
-                                                       long nonHeapUsed = 
Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
-                                                       long nonHeapCommitted = 
Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
-                                                       long nonHeapTotal = 
Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
+                                                       long nonHeapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
+                                                       long nonHeapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+                                                       long nonHeapTotal = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
 
                                                        
gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
                                                        
gen.writeNumberField("nonHeapUsed", nonHeapUsed);

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index 7914c29..d4c9b2a 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
-import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
@@ -28,6 +27,8 @@ import 
org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Collection;
@@ -55,6 +56,9 @@ public class CheckpointConfigHandler extends 
AbstractExecutionGraphRequestHandle
                return createCheckpointConfigJson(graph);
        }
 
+       /**
+        * Archivist for the CheckpointConfigHandler.
+        */
        public static class CheckpointConfigJsonArchivist implements 
JsonArchivist {
 
                @Override
@@ -68,7 +72,7 @@ public class CheckpointConfigHandler extends 
AbstractExecutionGraphRequestHandle
 
        private static String createCheckpointConfigJson(AccessExecutionGraph 
graph) throws IOException {
                StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
                JobCheckpointingSettings settings = 
graph.getJobCheckpointingSettings();
 
                if (settings == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
index 35d529a..9bbe8a7 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 
 import javax.annotation.Nullable;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index 16fd9bd..664744b 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
-import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -32,6 +31,8 @@ import 
org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
@@ -86,6 +87,9 @@ public class CheckpointStatsDetailsHandler extends 
AbstractExecutionGraphRequest
                return createCheckpointDetailsJson(checkpoint);
        }
 
+       /**
+        * Archivist for the CheckpointStatsDetails.
+        */
        public static class CheckpointStatsDetailsJsonArchivist implements 
JsonArchivist {
 
                @Override
@@ -109,7 +113,7 @@ public class CheckpointStatsDetailsHandler extends 
AbstractExecutionGraphRequest
 
        public static String 
createCheckpointDetailsJson(AbstractCheckpointStats checkpoint) throws 
IOException {
                StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
                gen.writeStartObject();
 
                gen.writeNumberField("id", checkpoint.getCheckpointId());

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index bb39b2c..f96e0c2 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
-import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -35,6 +34,8 @@ import 
org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
@@ -107,10 +108,13 @@ public class CheckpointStatsDetailsSubtasksHandler 
extends AbstractExecutionGrap
                if (taskStats == null) {
                        return "{}";
                }
-               
+
                return createSubtaskCheckpointDetailsJson(checkpoint, 
taskStats);
        }
 
+       /**
+        * Archivist for the CheckpointStatsDetailsSubtasksHandler.
+        */
        public static class CheckpointStatsDetailsSubtasksJsonArchivist 
implements JsonArchivist {
 
                @Override
@@ -137,7 +141,7 @@ public class CheckpointStatsDetailsSubtasksHandler extends 
AbstractExecutionGrap
 
        private static String 
createSubtaskCheckpointDetailsJson(AbstractCheckpointStats checkpoint, 
TaskStateStats taskStats) throws IOException {
                StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
                gen.writeStartObject();
                // Overview

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index f004888..a86c5fd 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
-import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
@@ -35,7 +34,10 @@ import 
org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Collection;
@@ -63,6 +65,9 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
                return createCheckpointStatsJson(graph);
        }
 
+       /**
+        * Archivist for the CheckpointStatsJsonHandler.
+        */
        public static class CheckpointStatsJsonArchivist implements 
JsonArchivist {
 
                @Override
@@ -76,7 +81,7 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
 
        private static String createCheckpointStatsJson(AccessExecutionGraph 
graph) throws IOException {
                StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
                CheckpointStatsSnapshot snapshot = 
graph.getCheckpointStatsSnapshot();
                if (snapshot == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 3337370..d86bfb2 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.webmonitor.history;
 
-import io.netty.handler.codec.http.router.Router;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
@@ -34,10 +34,13 @@ import 
org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+
+import io.netty.handler.codec.http.router.Router;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLContext;
+
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -53,15 +56,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
 /**
  * The HistoryServer provides a WebInterface and REST API to retrieve 
information about finished jobs for which
  * the JobManager may have already shut down.
- * 
- * The HistoryServer regularly checks a set of directories for job archives 
created by the {@link FsJobArchivist} and
+ *
+ * <p>The HistoryServer regularly checks a set of directories for job archives 
created by the {@link FsJobArchivist} and
  * caches these in a local directory. See {@link HistoryServerArchiveFetcher}.
- * 
- * All configuration options are defined in{@link HistoryServerOptions}.
- * 
- * The WebInterface only displays the "Completed Jobs" page.
- * 
- * The REST API is limited to
+ *
+ * <p>All configuration options are defined in{@link HistoryServerOptions}.
+ *
+ * <p>The WebInterface only displays the "Completed Jobs" page.
+ *
+ * <p>The REST API is limited to
  * <ul>
  *     <li>/config</li>
  *     <li>/joboverview</li>
@@ -110,7 +113,7 @@ public class HistoryServer {
                        });
                        System.exit(0);
                } catch (UndeclaredThrowableException ute) {
-                       Throwable cause = ute. getUndeclaredThrowable();
+                       Throwable cause = ute.getUndeclaredThrowable();
                        LOG.error("Failed to run HistoryServer.", cause);
                        cause.printStackTrace();
                        System.exit(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 0ff9e02..0fc4314 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -15,14 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.runtime.webmonitor.history;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
+package org.apache.flink.runtime.webmonitor.history;
 
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.HistoryServerOptions;
 import org.apache.flink.core.fs.FileStatus;
@@ -32,6 +27,11 @@ import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
 import org.apache.flink.util.FileUtils;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,12 +49,14 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * This class is used by the {@link HistoryServer} to fetch the job archives 
that are located at
  * {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS}. The directories 
are polled in regular intervals, defined
  * by {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL}.
- * 
- * The archives are downloaded and expanded into a file structure analog to 
the REST API defined in the WebRuntimeMonitor.
+ *
+ * <p>The archives are downloaded and expanded into a file structure analog to 
the REST API defined in the WebRuntimeMonitor.
  */
 class HistoryServerArchiveFetcher {
 
@@ -174,7 +176,7 @@ class HistoryServerArchiveFetcher {
                                                                        }
 
                                                                        
java.nio.file.Path targetPath = target.toPath();
-                                                                       
+
                                                                        // We 
overwrite existing files since this may be another attempt at fetching this 
archive.
                                                                        // 
Existing files may be incomplete/corrupt.
                                                                        if 
(Files.exists(targetPath)) {
@@ -224,10 +226,10 @@ class HistoryServerArchiveFetcher {
         * This method replicates the JSON response that would be given by the 
{@link CurrentJobsOverviewHandler} when
         * listing both running and finished jobs.
         *
-        * Every job archive contains a joboverview.json file containing the 
same structure. Since jobs are archived on
+        * <p>Every job archive contains a joboverview.json file containing the 
same structure. Since jobs are archived on
         * their own however the list of finished jobs only contains a single 
job.
         *
-        * For the display in the HistoryServer WebFrontend we have to combine 
these overviews.
+        * <p>For the display in the HistoryServer WebFrontend we have to 
combine these overviews.
         */
        private static void updateJobOverview(File webDir) {
                File webOverviewDir = new File(webDir, "overviews");

http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
index ba0e2d2..c14f3d8 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
@@ -26,6 +26,8 @@ package org.apache.flink.runtime.webmonitor.history;
  * 
https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
  *****************************************************************************/
 
+import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
+
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
@@ -41,7 +43,6 @@ import io.netty.handler.codec.http.LastHttpContent;
 import io.netty.handler.codec.http.router.Routed;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.stream.ChunkedFile;
-import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,10 +71,10 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * Simple file server handler used by the {@link HistoryServer} that serves 
requests to web frontend's static files,
  * such as HTML, CSS, JS or JSON files.
  *
- * This code is based on the "HttpStaticFileServerHandler" from the Netty 
project's HTTP server
+ * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty 
project's HTTP server
  * example.
- * 
- * This class is a copy of the {@link StaticFileServerHandler}. The 
differences are that the request path is
+ *
+ * <p>This class is a copy of the {@link StaticFileServerHandler}. The 
differences are that the request path is
  * modified to end on ".json" if it does not have a filename extension; when 
"index.html" is requested we load
  * "index_hs.html" instead to inject the modified HistoryServer WebInterface 
and that the caching of the "/joboverview"
  * page is prevented.
@@ -81,12 +82,12 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @ChannelHandler.Sharable
 public class HistoryServerStaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed> {
 
-       /** Default logger, if none is specified */
+       /** Default logger, if none is specified. */
        private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerStaticFileServerHandler.class);
 
        // 
------------------------------------------------------------------------
 
-       /** The path in which the static documents are */
+       /** The path in which the static documents are. */
        private final File rootPath;
 
        public HistoryServerStaticFileServerHandler(File rootPath) throws 
IOException {

Reply via email to