http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
new file mode 100644
index 0000000..8f21af0
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the JobCancellationWithSavepointHandler.
+ */
+public class JobCancellationWithSavepointHandlersTest extends TestLogger {
+
+       private static final Executor executor = Executors.directExecutor();
+
+       @Test
+       public void testGetPaths() {
+               JobCancellationWithSavepointHandlers handler = new 
JobCancellationWithSavepointHandlers(mock(ExecutionGraphHolder.class), 
executor);
+
+               JobCancellationWithSavepointHandlers.TriggerHandler 
triggerHandler = handler.getTriggerHandler();
+               String[] triggerPaths = triggerHandler.getPaths();
+               Assert.assertEquals(2, triggerPaths.length);
+               List<String> triggerPathsList = Arrays.asList(triggerPaths);
+               
Assert.assertTrue(triggerPathsList.contains("/jobs/:jobid/cancel-with-savepoint"));
+               
Assert.assertTrue(triggerPathsList.contains("/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory"));
+
+               JobCancellationWithSavepointHandlers.InProgressHandler 
progressHandler = handler.getInProgressHandler();
+               String[] progressPaths = progressHandler.getPaths();
+               Assert.assertEquals(1, progressPaths.length);
+               
Assert.assertEquals("/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId",
 progressPaths[0]);
+       }
+
+       /**
+        * Tests that the cancellation ask timeout respects the checkpoint 
timeout.
+        * Otherwise, AskTimeoutExceptions are bound to happen for large state.
+        */
+       @Test
+       public void testAskTimeoutEqualsCheckpointTimeout() throws Exception {
+               long timeout = 128288238L;
+               JobID jobId = new JobID();
+               ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+               ExecutionGraph graph = mock(ExecutionGraph.class);
+               CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+               when(holder.getExecutionGraph(eq(jobId), 
any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
+               when(graph.getCheckpointCoordinator()).thenReturn(coord);
+               when(coord.getCheckpointTimeout()).thenReturn(timeout);
+
+               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, executor);
+               JobCancellationWithSavepointHandlers.TriggerHandler handler = 
handlers.getTriggerHandler();
+
+               Map<String, String> params = new HashMap<>();
+               params.put("jobid", jobId.toString());
+               params.put("targetDirectory", "placeholder");
+
+               JobManagerGateway jobManager = mock(JobManagerGateway.class);
+               when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar"));
+
+               handler.handleRequest(params, Collections.emptyMap(), 
jobManager);
+
+               verify(jobManager).cancelJobWithSavepoint(eq(jobId), 
anyString(), any(Time.class));
+       }
+
+       /**
+        * Tests that the savepoint directory configuration is respected.
+        */
+       @Test
+       public void testSavepointDirectoryConfiguration() throws Exception {
+               long timeout = 128288238L;
+               JobID jobId = new JobID();
+               ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+               ExecutionGraph graph = mock(ExecutionGraph.class);
+               CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+               when(holder.getExecutionGraph(eq(jobId), 
any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
+               when(graph.getCheckpointCoordinator()).thenReturn(coord);
+               when(coord.getCheckpointTimeout()).thenReturn(timeout);
+
+               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, executor, "the-default-directory");
+               JobCancellationWithSavepointHandlers.TriggerHandler handler = 
handlers.getTriggerHandler();
+
+               Map<String, String> params = new HashMap<>();
+               params.put("jobid", jobId.toString());
+
+               JobManagerGateway jobManager = mock(JobManagerGateway.class);
+               when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar"));
+
+               // 1. Use targetDirectory path param
+               params.put("targetDirectory", "custom-directory");
+               handler.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
+
+               verify(jobManager).cancelJobWithSavepoint(eq(jobId), 
eq("custom-directory"), any(Time.class));
+
+               // 2. Use default
+               params.remove("targetDirectory");
+
+               handler.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
+
+               verify(jobManager).cancelJobWithSavepoint(eq(jobId), 
eq("the-default-directory"), any(Time.class));
+
+               // 3. Throw Exception
+               handlers = new JobCancellationWithSavepointHandlers(holder, 
executor, null);
+               handler = handlers.getTriggerHandler();
+
+               try {
+                       handler.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager).get();
+                       fail("Did not throw expected test Exception");
+               } catch (Exception e) {
+                       IllegalStateException cause = (IllegalStateException) 
e.getCause();
+                       assertEquals(true, 
cause.getMessage().contains(CoreOptions.SAVEPOINT_DIRECTORY.key()));
+               }
+       }
+
+       /**
+        * Tests triggering a new request and monitoring it.
+        */
+       @Test
+       public void testTriggerNewRequest() throws Exception {
+               JobID jobId = new JobID();
+               ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+               ExecutionGraph graph = mock(ExecutionGraph.class);
+               CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+               when(holder.getExecutionGraph(eq(jobId), 
any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
+               when(graph.getCheckpointCoordinator()).thenReturn(coord);
+
+               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, executor);
+               JobCancellationWithSavepointHandlers.TriggerHandler trigger = 
handlers.getTriggerHandler();
+               JobCancellationWithSavepointHandlers.InProgressHandler progress 
= handlers.getInProgressHandler();
+
+               Map<String, String> params = new HashMap<>();
+               params.put("jobid", jobId.toString());
+               params.put("targetDirectory", "custom-directory");
+
+               JobManagerGateway jobManager = mock(JobManagerGateway.class);
+
+               // Successful
+               CompletableFuture<String> successfulCancelWithSavepoint = new 
CompletableFuture<>();
+               when(jobManager.cancelJobWithSavepoint(eq(jobId), 
eq("custom-directory"), 
any(Time.class))).thenReturn(successfulCancelWithSavepoint);
+
+               // Trigger
+               FullHttpResponse response = trigger.handleRequest(params, 
Collections.emptyMap(), jobManager).get();
+
+               verify(jobManager).cancelJobWithSavepoint(eq(jobId), 
eq("custom-directory"), any(Time.class));
+
+               String location = 
String.format("/jobs/%s/cancel-with-savepoint/in-progress/1", jobId);
+
+               assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+               assertEquals("application/json; charset=UTF-8", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+               assertEquals(location, 
response.headers().get(HttpHeaders.Names.LOCATION));
+
+               String json = 
response.content().toString(Charset.forName("UTF-8"));
+               JsonNode root = new ObjectMapper().readTree(json);
+
+               assertEquals("accepted", root.get("status").asText());
+               assertEquals("1", root.get("request-id").asText());
+               assertEquals(location, root.get("location").asText());
+
+               // Trigger again
+               response = trigger.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager).get();
+               assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+               assertEquals("application/json; charset=UTF-8", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+               assertEquals(location, 
response.headers().get(HttpHeaders.Names.LOCATION));
+
+               json = response.content().toString(Charset.forName("UTF-8"));
+               root = new ObjectMapper().readTree(json);
+
+               assertEquals("accepted", root.get("status").asText());
+               assertEquals("1", root.get("request-id").asText());
+               assertEquals(location, root.get("location").asText());
+
+               // Only single actual request
+               verify(jobManager).cancelJobWithSavepoint(eq(jobId), 
eq("custom-directory"), any(Time.class));
+
+               // Query progress
+               params.put("requestId", "1");
+
+               response = progress.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager).get();
+               assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+               assertEquals("application/json; charset=UTF-8", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+               json = response.content().toString(Charset.forName("UTF-8"));
+               root = new ObjectMapper().readTree(json);
+
+               assertEquals("in-progress", root.get("status").asText());
+               assertEquals("1", root.get("request-id").asText());
+
+               // Complete
+               successfulCancelWithSavepoint.complete("_path-savepoint_");
+
+               response = progress.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager).get();
+
+               assertEquals(HttpResponseStatus.CREATED, response.getStatus());
+               assertEquals("application/json; charset=UTF-8", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+               json = response.content().toString(Charset.forName("UTF-8"));
+
+               root = new ObjectMapper().readTree(json);
+
+               assertEquals("success", root.get("status").asText());
+               assertEquals("1", root.get("request-id").asText());
+               assertEquals("_path-savepoint_", 
root.get("savepoint-path").asText());
+
+               // Query again, keep recent history
+
+               response = progress.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager).get();
+
+               assertEquals(HttpResponseStatus.CREATED, response.getStatus());
+               assertEquals("application/json; charset=UTF-8", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+               json = response.content().toString(Charset.forName("UTF-8"));
+
+               root = new ObjectMapper().readTree(json);
+
+               assertEquals("success", root.get("status").asText());
+               assertEquals("1", root.get("request-id").asText());
+               assertEquals("_path-savepoint_", 
root.get("savepoint-path").asText());
+
+               // Query for unknown request
+               params.put("requestId", "9929");
+
+               response = progress.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager).get();
+               assertEquals(HttpResponseStatus.BAD_REQUEST, 
response.getStatus());
+               assertEquals("application/json; charset=UTF-8", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+               json = response.content().toString(Charset.forName("UTF-8"));
+
+               root = new ObjectMapper().readTree(json);
+
+               assertEquals("failed", root.get("status").asText());
+               assertEquals("9929", root.get("request-id").asText());
+               assertEquals("Unknown job/request ID", 
root.get("cause").asText());
+       }
+
+       /**
+        * Tests response when a request fails.
+        */
+       @Test
+       public void testFailedCancellation() throws Exception {
+               JobID jobId = new JobID();
+               ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+               ExecutionGraph graph = mock(ExecutionGraph.class);
+               CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+               when(holder.getExecutionGraph(eq(jobId), 
any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
+               when(graph.getCheckpointCoordinator()).thenReturn(coord);
+
+               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, executor);
+               JobCancellationWithSavepointHandlers.TriggerHandler trigger = 
handlers.getTriggerHandler();
+               JobCancellationWithSavepointHandlers.InProgressHandler progress 
= handlers.getInProgressHandler();
+
+               Map<String, String> params = new HashMap<>();
+               params.put("jobid", jobId.toString());
+               params.put("targetDirectory", "custom-directory");
+
+               JobManagerGateway jobManager = mock(JobManagerGateway.class);
+
+               // Successful
+               CompletableFuture<String> unsuccessfulCancelWithSavepoint = 
FutureUtils.completedExceptionally(new Exception("Test Exception"));
+               when(jobManager.cancelJobWithSavepoint(eq(jobId), 
eq("custom-directory"), 
any(Time.class))).thenReturn(unsuccessfulCancelWithSavepoint);
+
+               // Trigger
+               trigger.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
+               verify(jobManager).cancelJobWithSavepoint(eq(jobId), 
eq("custom-directory"), any(Time.class));
+
+               // Query progress
+               params.put("requestId", "1");
+
+               FullHttpResponse response = progress.handleRequest(params, 
Collections.<String, String>emptyMap(), jobManager).get();
+               assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, 
response.getStatus());
+               assertEquals("application/json; charset=UTF-8", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+               String json = 
response.content().toString(Charset.forName("UTF-8"));
+               JsonNode root = new ObjectMapper().readTree(json);
+
+               assertEquals("failed", root.get("status").asText());
+               assertEquals("1", root.get("request-id").asText());
+               assertEquals("Test Exception", root.get("cause").asText());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
new file mode 100644
index 0000000..567df8c
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobConfigHandler.
+ */
+public class JobConfigHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
JobConfigHandler.JobConfigJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/config", archive.getPath());
+               compareJobConfig(originalJob, archive.getJson());
+       }
+
+       @Test
+       public void testGetPaths() {
+               JobConfigHandler handler = new 
JobConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               Assert.assertEquals("/jobs/:jobid/config", paths[0]);
+       }
+
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               String answer = 
JobConfigHandler.createJobConfigJson(originalJob);
+               compareJobConfig(originalJob, answer);
+       }
+
+       private static void compareJobConfig(AccessExecutionGraph originalJob, 
String answer) throws IOException {
+               JsonNode job = 
ArchivedJobGenerationUtils.MAPPER.readTree(answer);
+
+               Assert.assertEquals(originalJob.getJobID().toString(), 
job.get("jid").asText());
+               Assert.assertEquals(originalJob.getJobName(), 
job.get("name").asText());
+
+               ArchivedExecutionConfig originalConfig = 
originalJob.getArchivedExecutionConfig();
+               JsonNode config = job.get("execution-config");
+
+               Assert.assertEquals(originalConfig.getExecutionMode(), 
config.get("execution-mode").asText());
+               
Assert.assertEquals(originalConfig.getRestartStrategyDescription(), 
config.get("restart-strategy").asText());
+               Assert.assertEquals(originalConfig.getParallelism(), 
config.get("job-parallelism").asInt());
+               Assert.assertEquals(originalConfig.getObjectReuseEnabled(), 
config.get("object-reuse-mode").asBoolean());
+
+               Map<String, String> originalUserConfig = 
originalConfig.getGlobalJobParameters();
+               JsonNode userConfig = config.get("user-config");
+
+               for (Map.Entry<String, String> originalEntry : 
originalUserConfig.entrySet()) {
+                       Assert.assertEquals(originalEntry.getValue(), 
userConfig.get(originalEntry.getKey()).asText());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
new file mode 100644
index 0000000..afd743e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobDetailsHandler.
+ */
+public class JobDetailsHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
JobDetailsHandler.JobDetailsJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(2, archives.size());
+
+               Iterator<ArchivedJson> iterator = archives.iterator();
+               ArchivedJson archive1 = iterator.next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID(), 
archive1.getPath());
+               compareJobDetails(originalJob, archive1.getJson());
+
+               ArchivedJson archive2 = iterator.next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/vertices", archive2.getPath());
+               compareJobDetails(originalJob, archive2.getJson());
+       }
+
+       @Test
+       public void testGetPaths() {
+               JobDetailsHandler handler = new 
JobDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), 
null);
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(2, paths.length);
+               List<String> pathsList = Lists.newArrayList(paths);
+               Assert.assertTrue(pathsList.contains("/jobs/:jobid"));
+               Assert.assertTrue(pathsList.contains("/jobs/:jobid/vertices"));
+       }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               String json = 
JobDetailsHandler.createJobDetailsJson(originalJob, null);
+
+               compareJobDetails(originalJob, json);
+       }
+
+       private static void compareJobDetails(AccessExecutionGraph originalJob, 
String json) throws IOException {
+               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+               Assert.assertEquals(originalJob.getJobID().toString(), 
result.get("jid").asText());
+               Assert.assertEquals(originalJob.getJobName(), 
result.get("name").asText());
+               Assert.assertEquals(originalJob.isStoppable(), 
result.get("isStoppable").asBoolean());
+               Assert.assertEquals(originalJob.getState().name(), 
result.get("state").asText());
+
+               
Assert.assertEquals(originalJob.getStatusTimestamp(JobStatus.CREATED), 
result.get("start-time").asLong());
+               
Assert.assertEquals(originalJob.getStatusTimestamp(originalJob.getState()), 
result.get("end-time").asLong());
+               Assert.assertEquals(
+                       originalJob.getStatusTimestamp(originalJob.getState()) 
- originalJob.getStatusTimestamp(JobStatus.CREATED),
+                       result.get("duration").asLong()
+               );
+
+               JsonNode timestamps = result.get("timestamps");
+               for (JobStatus status : JobStatus.values()) {
+                       
Assert.assertEquals(originalJob.getStatusTimestamp(status), 
timestamps.get(status.name()).asLong());
+               }
+
+               ArrayNode tasks = (ArrayNode) result.get("vertices");
+               int x = 0;
+               for (AccessExecutionJobVertex expectedTask : 
originalJob.getVerticesTopologically()) {
+                       JsonNode task = tasks.get(x);
+
+                       
Assert.assertEquals(expectedTask.getJobVertexId().toString(), 
task.get("id").asText());
+                       Assert.assertEquals(expectedTask.getName(), 
task.get("name").asText());
+                       Assert.assertEquals(expectedTask.getParallelism(), 
task.get("parallelism").asInt());
+                       
Assert.assertEquals(expectedTask.getAggregateState().name(), 
task.get("status").asText());
+
+                       Assert.assertEquals(3, task.get("start-time").asLong());
+                       Assert.assertEquals(5, task.get("end-time").asLong());
+                       Assert.assertEquals(2, task.get("duration").asLong());
+
+                       JsonNode subtasksPerState = task.get("tasks");
+                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.CREATED.name()).asInt());
+                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.SCHEDULED.name()).asInt());
+                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.DEPLOYING.name()).asInt());
+                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.RUNNING.name()).asInt());
+                       Assert.assertEquals(1, 
subtasksPerState.get(ExecutionState.FINISHED.name()).asInt());
+                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.CANCELING.name()).asInt());
+                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.CANCELED.name()).asInt());
+                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.FAILED.name()).asInt());
+
+                       long expectedNumBytesIn = 0;
+                       long expectedNumBytesOut = 0;
+                       long expectedNumRecordsIn = 0;
+                       long expectedNumRecordsOut = 0;
+
+                       for (AccessExecutionVertex vertex : 
expectedTask.getTaskVertices()) {
+                               IOMetrics ioMetrics = 
vertex.getCurrentExecutionAttempt().getIOMetrics();
+
+                               expectedNumBytesIn += 
ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
+                               expectedNumBytesOut += 
ioMetrics.getNumBytesOut();
+                               expectedNumRecordsIn += 
ioMetrics.getNumRecordsIn();
+                               expectedNumRecordsOut += 
ioMetrics.getNumRecordsOut();
+                       }
+
+                       JsonNode metrics = task.get("metrics");
+
+                       Assert.assertEquals(expectedNumBytesIn, 
metrics.get("read-bytes").asLong());
+                       Assert.assertEquals(expectedNumBytesOut, 
metrics.get("write-bytes").asLong());
+                       Assert.assertEquals(expectedNumRecordsIn, 
metrics.get("read-records").asLong());
+                       Assert.assertEquals(expectedNumRecordsOut, 
metrics.get("write-records").asLong());
+
+                       x++;
+               }
+               Assert.assertEquals(1, tasks.size());
+
+               JsonNode statusCounts = result.get("status-counts");
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.CREATED.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.SCHEDULED.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.DEPLOYING.name()).asInt());
+               Assert.assertEquals(1, 
statusCounts.get(ExecutionState.RUNNING.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.FINISHED.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.CANCELING.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.CANCELED.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.FAILED.name()).asInt());
+
+               
Assert.assertEquals(ArchivedJobGenerationUtils.MAPPER.readTree(originalJob.getJsonPlan()),
 result.get("plan"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
new file mode 100644
index 0000000..6a20696
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.ExceptionUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobExceptionsHandler.
+ */
+public class JobExceptionsHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
JobExceptionsHandler.JobExceptionsJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/exceptions", archive.getPath());
+               compareExceptions(originalJob, archive.getJson());
+       }
+
+       @Test
+       public void testGetPaths() {
+               JobExceptionsHandler handler = new 
JobExceptionsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]);
+       }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               String json = 
JobExceptionsHandler.createJobExceptionsJson(originalJob);
+
+               compareExceptions(originalJob, json);
+       }
+
+       private static void compareExceptions(AccessExecutionGraph originalJob, 
String json) throws IOException {
+               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+               
Assert.assertEquals(originalJob.getFailureCause().getExceptionAsString(), 
result.get("root-exception").asText());
+               
Assert.assertEquals(originalJob.getFailureCause().getTimestamp(), 
result.get("timestamp").asLong());
+
+               ArrayNode exceptions = (ArrayNode) result.get("all-exceptions");
+
+               int x = 0;
+               for (AccessExecutionVertex expectedSubtask : 
originalJob.getAllExecutionVertices()) {
+                       if 
(!expectedSubtask.getFailureCauseAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION))
 {
+                               JsonNode exception = exceptions.get(x);
+
+                               
Assert.assertEquals(expectedSubtask.getFailureCauseAsString(), 
exception.get("exception").asText());
+                               
Assert.assertEquals(expectedSubtask.getStateTimestamp(ExecutionState.FAILED), 
exception.get("timestamp").asLong());
+                               
Assert.assertEquals(expectedSubtask.getTaskNameWithSubtaskIndex(), 
exception.get("task").asText());
+
+                               TaskManagerLocation location = 
expectedSubtask.getCurrentAssignedResourceLocation();
+                               String expectedLocationString = 
location.getFQDNHostname() + ':' + location.dataPort();
+                               Assert.assertEquals(expectedLocationString, 
exception.get("location").asText());
+                       }
+                       x++;
+               }
+               Assert.assertEquals(x > 
JobExceptionsHandler.MAX_NUMBER_EXCEPTION_TO_REPORT, 
result.get("truncated").asBoolean());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java
new file mode 100644
index 0000000..03ddb73
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the JobManagerConfigHandler.
+ */
+public class JobManagerConfigHandlerTest {
+       @Test
+       public void testGetPaths() {
+               JobManagerConfigHandler handler = new 
JobManagerConfigHandler(Executors.directExecutor(), null);
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               Assert.assertEquals("/jobmanager/config", paths[0]);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
new file mode 100644
index 0000000..6d3b213
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobPlanHandler.
+ */
+public class JobPlanHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
JobPlanHandler.JobPlanJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/plan", archive.getPath());
+               Assert.assertEquals(originalJob.getJsonPlan(), 
archive.getJson());
+       }
+
+       @Test
+       public void testGetPaths() {
+               JobPlanHandler handler = new 
JobPlanHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               Assert.assertEquals("/jobs/:jobid/plan", paths[0]);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandlerTest.java
new file mode 100644
index 0000000..2c39fcf
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandlerTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Tests for the JobStoppingHandler.
+ */
+public class JobStoppingHandlerTest extends TestLogger {
+       @Test
+       public void testGetPaths() {
+               JobStoppingHandler handler = new 
JobStoppingHandler(Executors.directExecutor(), TestingUtils.TIMEOUT());
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(2, paths.length);
+               List<String> pathsList = Lists.newArrayList(paths);
+               Assert.assertTrue(pathsList.contains("/jobs/:jobid/stop"));
+               Assert.assertTrue(pathsList.contains("/jobs/:jobid/yarn-stop"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
new file mode 100644
index 0000000..feffe60
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobVertexAccumulatorsHandler.
+ */
+public class JobVertexAccumulatorsHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/vertices/" + originalTask.getJobVertexId() + "/accumulators", 
archive.getPath());
+               compareAccumulators(originalTask, archive.getJson());
+       }
+
+       @Test
+       public void testGetPaths() {
+               JobVertexAccumulatorsHandler handler = new 
JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]);
+       }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               String json = 
JobVertexAccumulatorsHandler.createVertexAccumulatorsJson(originalTask);
+
+               compareAccumulators(originalTask, json);
+       }
+
+       private static void compareAccumulators(AccessExecutionJobVertex 
originalTask, String json) throws IOException {
+               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+               Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());
+
+               ArrayNode accs = (ArrayNode) result.get("user-accumulators");
+               StringifiedAccumulatorResult[] expectedAccs = 
originalTask.getAggregatedUserAccumulatorsStringified();
+
+               
ArchivedJobGenerationUtils.compareStringifiedAccumulators(expectedAccs, accs);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
new file mode 100644
index 0000000..bd6817f
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for back pressure handler responses.
+ */
+public class JobVertexBackPressureHandlerTest {
+       @Test
+       public void testGetPaths() {
+               JobVertexBackPressureHandler handler = new 
JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), mock(BackPressureStatsTracker.class), 0);
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/backpressure", paths[0]);
+       }
+
+       /** Tests the response when no stats are available. */
+       @Test
+       public void testResponseNoStatsAvailable() throws Exception {
+               ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+               BackPressureStatsTracker statsTracker = 
mock(BackPressureStatsTracker.class);
+
+               
when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
+                               .thenReturn(Optional.empty());
+
+               JobVertexBackPressureHandler handler = new 
JobVertexBackPressureHandler(
+                               mock(ExecutionGraphHolder.class),
+                               Executors.directExecutor(),
+                               statsTracker,
+                               9999);
+
+               String response = handler.handleRequest(jobVertex, 
Collections.<String, String>emptyMap()).get();
+
+               ObjectMapper mapper = new ObjectMapper();
+               JsonNode rootNode = mapper.readTree(response);
+
+               // Single element
+               assertEquals(1, rootNode.size());
+
+               // Status
+               JsonNode status = rootNode.get("status");
+               assertNotNull(status);
+               assertEquals("deprecated", status.textValue());
+
+               
verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class));
+       }
+
+       /** Tests the response when stats are available. */
+       @Test
+       public void testResponseStatsAvailable() throws Exception {
+               ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+               BackPressureStatsTracker statsTracker = 
mock(BackPressureStatsTracker.class);
+
+               OperatorBackPressureStats stats = new OperatorBackPressureStats(
+                               0, System.currentTimeMillis(), new double[] { 
0.31, 0.48, 1.0, 0.0 });
+
+               
when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
+                               .thenReturn(Optional.of(stats));
+
+               JobVertexBackPressureHandler handler = new 
JobVertexBackPressureHandler(
+                               mock(ExecutionGraphHolder.class),
+                               Executors.directExecutor(),
+                               statsTracker,
+                               9999);
+
+               String response = handler.handleRequest(jobVertex, 
Collections.<String, String>emptyMap()).get();
+
+               ObjectMapper mapper = new ObjectMapper();
+               JsonNode rootNode = mapper.readTree(response);
+
+               // Single element
+               assertEquals(4, rootNode.size());
+
+               // Status
+               JsonNode status = rootNode.get("status");
+               assertNotNull(status);
+               assertEquals("ok", status.textValue());
+
+               // Back pressure level
+               JsonNode backPressureLevel = rootNode.get("backpressure-level");
+               assertNotNull(backPressureLevel);
+               assertEquals("high", backPressureLevel.textValue());
+
+               // End time stamp
+               JsonNode endTimeStamp = rootNode.get("end-timestamp");
+               assertNotNull(endTimeStamp);
+               assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue());
+
+               // Subtasks
+               JsonNode subTasks = rootNode.get("subtasks");
+               assertEquals(stats.getNumberOfSubTasks(), subTasks.size());
+               for (int i = 0; i < subTasks.size(); i++) {
+                       JsonNode subTask = subTasks.get(i);
+
+                       JsonNode index = subTask.get("subtask");
+                       assertEquals(i, index.intValue());
+
+                       JsonNode level = subTask.get("backpressure-level");
+                       assertEquals(JobVertexBackPressureHandler
+                                       
.getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue());
+
+                       JsonNode ratio = subTask.get("ratio");
+                       assertEquals(stats.getBackPressureRatio(i), 
ratio.doubleValue(), 0.0);
+               }
+
+               // Verify not triggered
+               verify(statsTracker, 
never()).triggerStackTraceSample(any(ExecutionJobVertex.class));
+       }
+
+       /** Tests that after the refresh interval another sample is triggered. 
*/
+       @Test
+       public void testResponsePassedRefreshInterval() throws Exception {
+               ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+               BackPressureStatsTracker statsTracker = 
mock(BackPressureStatsTracker.class);
+
+               OperatorBackPressureStats stats = new OperatorBackPressureStats(
+                               0, System.currentTimeMillis(), new double[] { 
0.31, 0.48, 1.0, 0.0 });
+
+               
when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
+                               .thenReturn(Optional.of(stats));
+
+               JobVertexBackPressureHandler handler = new 
JobVertexBackPressureHandler(
+                               mock(ExecutionGraphHolder.class),
+                               Executors.directExecutor(),
+                               statsTracker,
+                               0); // <----- refresh interval should fire 
immediately
+
+               String response = handler.handleRequest(jobVertex, 
Collections.<String, String>emptyMap()).get();
+
+               ObjectMapper mapper = new ObjectMapper();
+               JsonNode rootNode = mapper.readTree(response);
+
+               // Single element
+               assertEquals(4, rootNode.size());
+
+               // Status
+               JsonNode status = rootNode.get("status");
+               assertNotNull(status);
+               // Interval passed, hence deprecated
+               assertEquals("deprecated", status.textValue());
+
+               // Back pressure level
+               JsonNode backPressureLevel = rootNode.get("backpressure-level");
+               assertNotNull(backPressureLevel);
+               assertEquals("high", backPressureLevel.textValue());
+
+               // End time stamp
+               JsonNode endTimeStamp = rootNode.get("end-timestamp");
+               assertNotNull(endTimeStamp);
+               assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue());
+
+               // Subtasks
+               JsonNode subTasks = rootNode.get("subtasks");
+               assertEquals(stats.getNumberOfSubTasks(), subTasks.size());
+               for (int i = 0; i < subTasks.size(); i++) {
+                       JsonNode subTask = subTasks.get(i);
+
+                       JsonNode index = subTask.get("subtask");
+                       assertEquals(i, index.intValue());
+
+                       JsonNode level = subTask.get("backpressure-level");
+                       assertEquals(JobVertexBackPressureHandler
+                                       
.getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue());
+
+                       JsonNode ratio = subTask.get("ratio");
+                       assertEquals(stats.getBackPressureRatio(i), 
ratio.doubleValue(), 0.0);
+               }
+
+               // Verify triggered
+               
verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
new file mode 100644
index 0000000..5af1d53
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobVertexDetailsHandler.
+ */
+public class JobVertexDetailsHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
JobVertexDetailsHandler.JobVertexDetailsJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/vertices/" + originalTask.getJobVertexId(), archive.getPath());
+               compareVertexDetails(originalTask, archive.getJson());
+       }
+
+       @Test
+       public void testGetPaths() {
+               JobVertexDetailsHandler handler = new 
JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), null);
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", 
paths[0]);
+       }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               String json = JobVertexDetailsHandler.createVertexDetailsJson(
+                       originalTask, 
ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
+
+               compareVertexDetails(originalTask, json);
+       }
+
+       private static void compareVertexDetails(AccessExecutionJobVertex 
originalTask, String json) throws IOException {
+               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+               Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());
+               Assert.assertEquals(originalTask.getName(), 
result.get("name").asText());
+               Assert.assertEquals(originalTask.getParallelism(), 
result.get("parallelism").asInt());
+               Assert.assertTrue(result.get("now").asLong() > 0);
+
+               ArrayNode subtasks = (ArrayNode) result.get("subtasks");
+
+               Assert.assertEquals(originalTask.getTaskVertices().length, 
subtasks.size());
+               for (int x = 0; x < originalTask.getTaskVertices().length; x++) 
{
+                       AccessExecutionVertex expectedSubtask = 
originalTask.getTaskVertices()[x];
+                       JsonNode subtask = subtasks.get(x);
+
+                       Assert.assertEquals(x, subtask.get("subtask").asInt());
+                       
Assert.assertEquals(expectedSubtask.getExecutionState().name(), 
subtask.get("status").asText());
+                       
Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(),
 subtask.get("attempt").asInt());
+
+                       TaskManagerLocation location = 
expectedSubtask.getCurrentAssignedResourceLocation();
+                       String expectedLocationString = location.getHostname() 
+ ":" + location.dataPort();
+                       Assert.assertEquals(expectedLocationString, 
subtask.get("host").asText());
+                       long start = 
expectedSubtask.getStateTimestamp(ExecutionState.DEPLOYING);
+                       Assert.assertEquals(start, 
subtask.get("start-time").asLong());
+                       long end = 
expectedSubtask.getStateTimestamp(ExecutionState.FINISHED);
+                       Assert.assertEquals(end, 
subtask.get("end-time").asLong());
+                       Assert.assertEquals(end - start, 
subtask.get("duration").asLong());
+
+                       
ArchivedJobGenerationUtils.compareIoMetrics(expectedSubtask.getCurrentExecutionAttempt().getIOMetrics(),
 subtask.get("metrics"));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java
new file mode 100644
index 0000000..2a027fd
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobVertexTaskManagersHandler.
+ */
+public class JobVertexTaskManagersHandlerTest extends TestLogger {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               AccessExecutionVertex originalSubtask = 
ArchivedJobGenerationUtils.getTestSubtask();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/vertices/" + originalTask.getJobVertexId() + "/taskmanagers", 
archive.getPath());
+               compareVertexTaskManagers(originalTask, originalSubtask, 
archive.getJson());
+       }
+
+       @Test
+       public void testGetPaths() {
+               JobVertexTaskManagersHandler handler = new 
JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), null);
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]);
+       }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               AccessExecutionVertex originalSubtask = 
ArchivedJobGenerationUtils.getTestSubtask();
+               String json = 
JobVertexTaskManagersHandler.createVertexDetailsByTaskManagerJson(
+                       originalTask, 
ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
+
+               compareVertexTaskManagers(originalTask, originalSubtask, json);
+       }
+
+       private static void compareVertexTaskManagers(AccessExecutionJobVertex 
originalTask, AccessExecutionVertex originalSubtask, String json) throws 
IOException {
+               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+               Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());
+               Assert.assertEquals(originalTask.getName(), 
result.get("name").asText());
+               Assert.assertTrue(result.get("now").asLong() > 0);
+
+               ArrayNode taskmanagers = (ArrayNode) result.get("taskmanagers");
+
+               JsonNode taskManager = taskmanagers.get(0);
+
+               TaskManagerLocation location = 
originalSubtask.getCurrentAssignedResourceLocation();
+               String expectedLocationString = location.getHostname() + ':' + 
location.dataPort();
+               Assert.assertEquals(expectedLocationString, 
taskManager.get("host").asText());
+               Assert.assertEquals(ExecutionState.FINISHED.name(), 
taskManager.get("status").asText());
+
+               Assert.assertEquals(3, taskManager.get("start-time").asLong());
+               Assert.assertEquals(5, taskManager.get("end-time").asLong());
+               Assert.assertEquals(2, taskManager.get("duration").asLong());
+
+               JsonNode statusCounts = taskManager.get("status-counts");
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.CREATED.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.SCHEDULED.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.DEPLOYING.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.RUNNING.name()).asInt());
+               Assert.assertEquals(1, 
statusCounts.get(ExecutionState.FINISHED.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.CANCELING.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.CANCELED.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.FAILED.name()).asInt());
+
+               long expectedNumBytesIn = 0;
+               long expectedNumBytesOut = 0;
+               long expectedNumRecordsIn = 0;
+               long expectedNumRecordsOut = 0;
+
+               for (AccessExecutionVertex vertex : 
originalTask.getTaskVertices()) {
+                       IOMetrics ioMetrics = 
vertex.getCurrentExecutionAttempt().getIOMetrics();
+
+                       expectedNumBytesIn += ioMetrics.getNumBytesInLocal() + 
ioMetrics.getNumBytesInRemote();
+                       expectedNumBytesOut += ioMetrics.getNumBytesOut();
+                       expectedNumRecordsIn += ioMetrics.getNumRecordsIn();
+                       expectedNumRecordsOut += ioMetrics.getNumRecordsOut();
+               }
+
+               JsonNode metrics = taskManager.get("metrics");
+
+               Assert.assertEquals(expectedNumBytesIn, 
metrics.get("read-bytes").asLong());
+               Assert.assertEquals(expectedNumBytesOut, 
metrics.get("write-bytes").asLong());
+               Assert.assertEquals(expectedNumRecordsIn, 
metrics.get("read-records").asLong());
+               Assert.assertEquals(expectedNumRecordsOut, 
metrics.get("write-records").asLong());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java
new file mode 100644
index 0000000..9e0d549
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SubtaskCurrentAttemptDetailsHandler.
+ */
+public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
+       @Test
+       public void testGetPaths() {
+               SubtaskCurrentAttemptDetailsHandler handler = new 
SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), null);
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", 
paths[0]);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
new file mode 100644
index 0000000..49e54c0
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SubtaskExecutionAttemptAccumulatorsHandler.
+ */
+public class SubtaskExecutionAttemptAccumulatorsHandlerTest extends TestLogger 
{
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals(
+                       "/jobs/" + originalJob.getJobID() +
+                       "/vertices/" + originalTask.getJobVertexId() +
+                       "/subtasks/" + 
originalAttempt.getParallelSubtaskIndex() +
+                       "/attempts/" + originalAttempt.getAttemptNumber() +
+                       "/accumulators",
+                       archive.getPath());
+               compareAttemptAccumulators(originalAttempt, archive.getJson());
+       }
+
+       @Test
+       public void testGetPaths() {
+               SubtaskExecutionAttemptAccumulatorsHandler handler = new 
SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators",
 paths[0]);
+       }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
+               String json = 
SubtaskExecutionAttemptAccumulatorsHandler.createAttemptAccumulatorsJson(originalAttempt);
+
+               compareAttemptAccumulators(originalAttempt, json);
+       }
+
+       private static void compareAttemptAccumulators(AccessExecution 
originalAttempt, String json) throws IOException {
+               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+               Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), 
result.get("subtask").asInt());
+               Assert.assertEquals(originalAttempt.getAttemptNumber(), 
result.get("attempt").asInt());
+               Assert.assertEquals(originalAttempt.getAttemptId().toString(), 
result.get("id").asText());
+
+               
ArchivedJobGenerationUtils.compareStringifiedAccumulators(originalAttempt.getUserAccumulatorsStringified(),
 (ArrayNode) result.get("user-accumulators"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java
new file mode 100644
index 0000000..e1fe8b5
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SubtaskExecutionAttemptDetailsHandler.
+ */
+public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(2, archives.size());
+
+               Iterator<ArchivedJson> iterator = archives.iterator();
+               ArchivedJson archive1 = iterator.next();
+               Assert.assertEquals(
+                       "/jobs/" + originalJob.getJobID() +
+                               "/vertices/" + originalTask.getJobVertexId() +
+                               "/subtasks/" + 
originalAttempt.getParallelSubtaskIndex(),
+                       archive1.getPath());
+               compareAttemptDetails(originalAttempt, archive1.getJson());
+
+               ArchivedJson archive2 = iterator.next();
+               Assert.assertEquals(
+                       "/jobs/" + originalJob.getJobID() +
+                               "/vertices/" + originalTask.getJobVertexId() +
+                               "/subtasks/" + 
originalAttempt.getParallelSubtaskIndex() +
+                               "/attempts/" + 
originalAttempt.getAttemptNumber(),
+                       archive2.getPath());
+               compareAttemptDetails(originalAttempt, archive2.getJson());
+       }
+
+       @Test
+       public void testGetPaths() {
+               SubtaskExecutionAttemptDetailsHandler handler = new 
SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(),  null);
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt",
 paths[0]);
+       }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
+               String json = 
SubtaskExecutionAttemptDetailsHandler.createAttemptDetailsJson(
+                       originalAttempt, originalJob.getJobID().toString(), 
originalTask.getJobVertexId().toString(), null);
+
+               compareAttemptDetails(originalAttempt, json);
+       }
+
+       private static void compareAttemptDetails(AccessExecution 
originalAttempt, String json) throws IOException {
+               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+               Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), 
result.get("subtask").asInt());
+               Assert.assertEquals(originalAttempt.getState().name(), 
result.get("status").asText());
+               Assert.assertEquals(originalAttempt.getAttemptNumber(), 
result.get("attempt").asInt());
+               
Assert.assertEquals(originalAttempt.getAssignedResourceLocation().getHostname(),
 result.get("host").asText());
+               long start = 
originalAttempt.getStateTimestamp(ExecutionState.DEPLOYING);
+               Assert.assertEquals(start, result.get("start-time").asLong());
+               long end = 
originalAttempt.getStateTimestamp(ExecutionState.FINISHED);
+               Assert.assertEquals(end, result.get("end-time").asLong());
+               Assert.assertEquals(end - start, 
result.get("duration").asLong());
+
+               
ArchivedJobGenerationUtils.compareIoMetrics(originalAttempt.getIOMetrics(), 
result.get("metrics"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java
new file mode 100644
index 0000000..1478f00
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SubtasksAllAccumulatorsHandler.
+ */
+public class SubtasksAllAccumulatorsHandlerTest extends TestLogger {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/vertices/" + originalTask.getJobVertexId() +
+                       "/subtasks/accumulators", archive.getPath());
+               compareSubtaskAccumulators(originalTask, archive.getJson());
+       }
+
+       @Test
+       public void testGetPaths() {
+               SubtasksAllAccumulatorsHandler handler = new 
SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", 
paths[0]);
+       }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               String json = 
SubtasksAllAccumulatorsHandler.createSubtasksAccumulatorsJson(originalTask);
+               compareSubtaskAccumulators(originalTask, json);
+       }
+
+       private static void compareSubtaskAccumulators(AccessExecutionJobVertex 
originalTask, String json) throws IOException {
+               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+               Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());
+               Assert.assertEquals(originalTask.getParallelism(), 
result.get("parallelism").asInt());
+
+               ArrayNode subtasks = (ArrayNode) result.get("subtasks");
+
+               Assert.assertEquals(originalTask.getTaskVertices().length, 
subtasks.size());
+               for (int x = 0; x < originalTask.getTaskVertices().length; x++) 
{
+                       JsonNode subtask = subtasks.get(x);
+                       AccessExecutionVertex expectedSubtask = 
originalTask.getTaskVertices()[x];
+
+                       Assert.assertEquals(x, subtask.get("subtask").asInt());
+                       
Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(),
 subtask.get("attempt").asInt());
+                       
Assert.assertEquals(expectedSubtask.getCurrentAssignedResourceLocation().getHostname(),
 subtask.get("host").asText());
+
+                       
ArchivedJobGenerationUtils.compareStringifiedAccumulators(
+                               
expectedSubtask.getCurrentExecutionAttempt().getUserAccumulatorsStringified(),
+                               (ArrayNode) subtask.get("user-accumulators"));
+               }
+       }
+}

Reply via email to