http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
deleted file mode 100644
index e34631e..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.util.TestLogger;
-
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the JobCancellationWithSavepointHandler.
- */
-public class JobCancellationWithSavepointHandlersTest extends TestLogger {
-
-       private static final Executor executor = Executors.directExecutor();
-
-       @Test
-       public void testGetPaths() {
-               JobCancellationWithSavepointHandlers handler = new 
JobCancellationWithSavepointHandlers(mock(ExecutionGraphHolder.class), 
executor);
-
-               JobCancellationWithSavepointHandlers.TriggerHandler 
triggerHandler = handler.getTriggerHandler();
-               String[] triggerPaths = triggerHandler.getPaths();
-               Assert.assertEquals(2, triggerPaths.length);
-               List<String> triggerPathsList = Arrays.asList(triggerPaths);
-               
Assert.assertTrue(triggerPathsList.contains("/jobs/:jobid/cancel-with-savepoint"));
-               
Assert.assertTrue(triggerPathsList.contains("/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory"));
-
-               JobCancellationWithSavepointHandlers.InProgressHandler 
progressHandler = handler.getInProgressHandler();
-               String[] progressPaths = progressHandler.getPaths();
-               Assert.assertEquals(1, progressPaths.length);
-               
Assert.assertEquals("/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId",
 progressPaths[0]);
-       }
-
-       /**
-        * Tests that the cancellation ask timeout respects the checkpoint 
timeout.
-        * Otherwise, AskTimeoutExceptions are bound to happen for large state.
-        */
-       @Test
-       public void testAskTimeoutEqualsCheckpointTimeout() throws Exception {
-               long timeout = 128288238L;
-               JobID jobId = new JobID();
-               ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
-               ExecutionGraph graph = mock(ExecutionGraph.class);
-               CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-               when(holder.getExecutionGraph(eq(jobId), 
any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
-               when(graph.getCheckpointCoordinator()).thenReturn(coord);
-               when(coord.getCheckpointTimeout()).thenReturn(timeout);
-
-               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, executor);
-               JobCancellationWithSavepointHandlers.TriggerHandler handler = 
handlers.getTriggerHandler();
-
-               Map<String, String> params = new HashMap<>();
-               params.put("jobid", jobId.toString());
-               params.put("targetDirectory", "placeholder");
-
-               JobManagerGateway jobManager = mock(JobManagerGateway.class);
-               when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar"));
-
-               handler.handleRequest(params, Collections.emptyMap(), 
jobManager);
-
-               verify(jobManager).cancelJobWithSavepoint(eq(jobId), 
anyString(), any(Time.class));
-       }
-
-       /**
-        * Tests that the savepoint directory configuration is respected.
-        */
-       @Test
-       public void testSavepointDirectoryConfiguration() throws Exception {
-               long timeout = 128288238L;
-               JobID jobId = new JobID();
-               ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
-               ExecutionGraph graph = mock(ExecutionGraph.class);
-               CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-               when(holder.getExecutionGraph(eq(jobId), 
any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
-               when(graph.getCheckpointCoordinator()).thenReturn(coord);
-               when(coord.getCheckpointTimeout()).thenReturn(timeout);
-
-               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, executor, "the-default-directory");
-               JobCancellationWithSavepointHandlers.TriggerHandler handler = 
handlers.getTriggerHandler();
-
-               Map<String, String> params = new HashMap<>();
-               params.put("jobid", jobId.toString());
-
-               JobManagerGateway jobManager = mock(JobManagerGateway.class);
-               when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar"));
-
-               // 1. Use targetDirectory path param
-               params.put("targetDirectory", "custom-directory");
-               handler.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
-
-               verify(jobManager).cancelJobWithSavepoint(eq(jobId), 
eq("custom-directory"), any(Time.class));
-
-               // 2. Use default
-               params.remove("targetDirectory");
-
-               handler.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
-
-               verify(jobManager).cancelJobWithSavepoint(eq(jobId), 
eq("the-default-directory"), any(Time.class));
-
-               // 3. Throw Exception
-               handlers = new JobCancellationWithSavepointHandlers(holder, 
executor, null);
-               handler = handlers.getTriggerHandler();
-
-               try {
-                       handler.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager).get();
-                       fail("Did not throw expected test Exception");
-               } catch (Exception e) {
-                       IllegalStateException cause = (IllegalStateException) 
e.getCause();
-                       assertEquals(true, 
cause.getMessage().contains(CoreOptions.SAVEPOINT_DIRECTORY.key()));
-               }
-       }
-
-       /**
-        * Tests triggering a new request and monitoring it.
-        */
-       @Test
-       public void testTriggerNewRequest() throws Exception {
-               JobID jobId = new JobID();
-               ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
-               ExecutionGraph graph = mock(ExecutionGraph.class);
-               CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-               when(holder.getExecutionGraph(eq(jobId), 
any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
-               when(graph.getCheckpointCoordinator()).thenReturn(coord);
-
-               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, executor);
-               JobCancellationWithSavepointHandlers.TriggerHandler trigger = 
handlers.getTriggerHandler();
-               JobCancellationWithSavepointHandlers.InProgressHandler progress 
= handlers.getInProgressHandler();
-
-               Map<String, String> params = new HashMap<>();
-               params.put("jobid", jobId.toString());
-               params.put("targetDirectory", "custom-directory");
-
-               JobManagerGateway jobManager = mock(JobManagerGateway.class);
-
-               // Successful
-               CompletableFuture<String> successfulCancelWithSavepoint = new 
CompletableFuture<>();
-               when(jobManager.cancelJobWithSavepoint(eq(jobId), 
eq("custom-directory"), 
any(Time.class))).thenReturn(successfulCancelWithSavepoint);
-
-               // Trigger
-               FullHttpResponse response = trigger.handleRequest(params, 
Collections.emptyMap(), jobManager).get();
-
-               verify(jobManager).cancelJobWithSavepoint(eq(jobId), 
eq("custom-directory"), any(Time.class));
-
-               String location = 
String.format("/jobs/%s/cancel-with-savepoint/in-progress/1", jobId);
-
-               assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
-               assertEquals("application/json; charset=UTF-8", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
-               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
-               assertEquals(location, 
response.headers().get(HttpHeaders.Names.LOCATION));
-
-               String json = 
response.content().toString(Charset.forName("UTF-8"));
-               JsonNode root = new ObjectMapper().readTree(json);
-
-               assertEquals("accepted", root.get("status").asText());
-               assertEquals("1", root.get("request-id").asText());
-               assertEquals(location, root.get("location").asText());
-
-               // Trigger again
-               response = trigger.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager).get();
-               assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
-               assertEquals("application/json; charset=UTF-8", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
-               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
-               assertEquals(location, 
response.headers().get(HttpHeaders.Names.LOCATION));
-
-               json = response.content().toString(Charset.forName("UTF-8"));
-               root = new ObjectMapper().readTree(json);
-
-               assertEquals("accepted", root.get("status").asText());
-               assertEquals("1", root.get("request-id").asText());
-               assertEquals(location, root.get("location").asText());
-
-               // Only single actual request
-               verify(jobManager).cancelJobWithSavepoint(eq(jobId), 
eq("custom-directory"), any(Time.class));
-
-               // Query progress
-               params.put("requestId", "1");
-
-               response = progress.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager).get();
-               assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
-               assertEquals("application/json; charset=UTF-8", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
-               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
-
-               json = response.content().toString(Charset.forName("UTF-8"));
-               root = new ObjectMapper().readTree(json);
-
-               assertEquals("in-progress", root.get("status").asText());
-               assertEquals("1", root.get("request-id").asText());
-
-               // Complete
-               successfulCancelWithSavepoint.complete("_path-savepoint_");
-
-               response = progress.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager).get();
-
-               assertEquals(HttpResponseStatus.CREATED, response.getStatus());
-               assertEquals("application/json; charset=UTF-8", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
-               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
-
-               json = response.content().toString(Charset.forName("UTF-8"));
-
-               root = new ObjectMapper().readTree(json);
-
-               assertEquals("success", root.get("status").asText());
-               assertEquals("1", root.get("request-id").asText());
-               assertEquals("_path-savepoint_", 
root.get("savepoint-path").asText());
-
-               // Query again, keep recent history
-
-               response = progress.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager).get();
-
-               assertEquals(HttpResponseStatus.CREATED, response.getStatus());
-               assertEquals("application/json; charset=UTF-8", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
-               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
-
-               json = response.content().toString(Charset.forName("UTF-8"));
-
-               root = new ObjectMapper().readTree(json);
-
-               assertEquals("success", root.get("status").asText());
-               assertEquals("1", root.get("request-id").asText());
-               assertEquals("_path-savepoint_", 
root.get("savepoint-path").asText());
-
-               // Query for unknown request
-               params.put("requestId", "9929");
-
-               response = progress.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager).get();
-               assertEquals(HttpResponseStatus.BAD_REQUEST, 
response.getStatus());
-               assertEquals("application/json; charset=UTF-8", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
-               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
-
-               json = response.content().toString(Charset.forName("UTF-8"));
-
-               root = new ObjectMapper().readTree(json);
-
-               assertEquals("failed", root.get("status").asText());
-               assertEquals("9929", root.get("request-id").asText());
-               assertEquals("Unknown job/request ID", 
root.get("cause").asText());
-       }
-
-       /**
-        * Tests response when a request fails.
-        */
-       @Test
-       public void testFailedCancellation() throws Exception {
-               JobID jobId = new JobID();
-               ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
-               ExecutionGraph graph = mock(ExecutionGraph.class);
-               CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-               when(holder.getExecutionGraph(eq(jobId), 
any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
-               when(graph.getCheckpointCoordinator()).thenReturn(coord);
-
-               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, executor);
-               JobCancellationWithSavepointHandlers.TriggerHandler trigger = 
handlers.getTriggerHandler();
-               JobCancellationWithSavepointHandlers.InProgressHandler progress 
= handlers.getInProgressHandler();
-
-               Map<String, String> params = new HashMap<>();
-               params.put("jobid", jobId.toString());
-               params.put("targetDirectory", "custom-directory");
-
-               JobManagerGateway jobManager = mock(JobManagerGateway.class);
-
-               // Successful
-               CompletableFuture<String> unsuccessfulCancelWithSavepoint = 
FutureUtils.completedExceptionally(new Exception("Test Exception"));
-               when(jobManager.cancelJobWithSavepoint(eq(jobId), 
eq("custom-directory"), 
any(Time.class))).thenReturn(unsuccessfulCancelWithSavepoint);
-
-               // Trigger
-               trigger.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
-               verify(jobManager).cancelJobWithSavepoint(eq(jobId), 
eq("custom-directory"), any(Time.class));
-
-               // Query progress
-               params.put("requestId", "1");
-
-               FullHttpResponse response = progress.handleRequest(params, 
Collections.<String, String>emptyMap(), jobManager).get();
-               assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, 
response.getStatus());
-               assertEquals("application/json; charset=UTF-8", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
-               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
-
-               String json = 
response.content().toString(Charset.forName("UTF-8"));
-               JsonNode root = new ObjectMapper().readTree(json);
-
-               assertEquals("failed", root.get("status").asText());
-               assertEquals("1", root.get("request-id").asText());
-               assertEquals("Test Exception", root.get("cause").asText());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
deleted file mode 100644
index 1c08ae8..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.ArchivedExecutionConfig;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the JobConfigHandler.
- */
-public class JobConfigHandlerTest {
-
-       @Test
-       public void testArchiver() throws Exception {
-               JsonArchivist archivist = new 
JobConfigHandler.JobConfigJsonArchivist();
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-
-               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
-               Assert.assertEquals(1, archives.size());
-
-               ArchivedJson archive = archives.iterator().next();
-               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/config", archive.getPath());
-               compareJobConfig(originalJob, archive.getJson());
-       }
-
-       @Test
-       public void testGetPaths() {
-               JobConfigHandler handler = new 
JobConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               Assert.assertEquals("/jobs/:jobid/config", paths[0]);
-       }
-
-       public void testJsonGeneration() throws Exception {
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-               String answer = 
JobConfigHandler.createJobConfigJson(originalJob);
-               compareJobConfig(originalJob, answer);
-       }
-
-       private static void compareJobConfig(AccessExecutionGraph originalJob, 
String answer) throws IOException {
-               JsonNode job = 
ArchivedJobGenerationUtils.MAPPER.readTree(answer);
-
-               Assert.assertEquals(originalJob.getJobID().toString(), 
job.get("jid").asText());
-               Assert.assertEquals(originalJob.getJobName(), 
job.get("name").asText());
-
-               ArchivedExecutionConfig originalConfig = 
originalJob.getArchivedExecutionConfig();
-               JsonNode config = job.get("execution-config");
-
-               Assert.assertEquals(originalConfig.getExecutionMode(), 
config.get("execution-mode").asText());
-               
Assert.assertEquals(originalConfig.getRestartStrategyDescription(), 
config.get("restart-strategy").asText());
-               Assert.assertEquals(originalConfig.getParallelism(), 
config.get("job-parallelism").asInt());
-               Assert.assertEquals(originalConfig.getObjectReuseEnabled(), 
config.get("object-reuse-mode").asBoolean());
-
-               Map<String, String> originalUserConfig = 
originalConfig.getGlobalJobParameters();
-               JsonNode userConfig = config.get("user-config");
-
-               for (Map.Entry<String, String> originalEntry : 
originalUserConfig.entrySet()) {
-                       Assert.assertEquals(originalEntry.getValue(), 
userConfig.get(originalEntry.getKey()).asText());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
deleted file mode 100644
index ee0498e..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the JobDetailsHandler.
- */
-public class JobDetailsHandlerTest {
-
-       @Test
-       public void testArchiver() throws Exception {
-               JsonArchivist archivist = new 
JobDetailsHandler.JobDetailsJsonArchivist();
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-
-               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
-               Assert.assertEquals(2, archives.size());
-
-               Iterator<ArchivedJson> iterator = archives.iterator();
-               ArchivedJson archive1 = iterator.next();
-               Assert.assertEquals("/jobs/" + originalJob.getJobID(), 
archive1.getPath());
-               compareJobDetails(originalJob, archive1.getJson());
-
-               ArchivedJson archive2 = iterator.next();
-               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/vertices", archive2.getPath());
-               compareJobDetails(originalJob, archive2.getJson());
-       }
-
-       @Test
-       public void testGetPaths() {
-               JobDetailsHandler handler = new 
JobDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), 
null);
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(2, paths.length);
-               List<String> pathsList = Lists.newArrayList(paths);
-               Assert.assertTrue(pathsList.contains("/jobs/:jobid"));
-               Assert.assertTrue(pathsList.contains("/jobs/:jobid/vertices"));
-       }
-
-       @Test
-       public void testJsonGeneration() throws Exception {
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-               String json = 
JobDetailsHandler.createJobDetailsJson(originalJob, null);
-
-               compareJobDetails(originalJob, json);
-       }
-
-       private static void compareJobDetails(AccessExecutionGraph originalJob, 
String json) throws IOException {
-               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-               Assert.assertEquals(originalJob.getJobID().toString(), 
result.get("jid").asText());
-               Assert.assertEquals(originalJob.getJobName(), 
result.get("name").asText());
-               Assert.assertEquals(originalJob.isStoppable(), 
result.get("isStoppable").asBoolean());
-               Assert.assertEquals(originalJob.getState().name(), 
result.get("state").asText());
-
-               
Assert.assertEquals(originalJob.getStatusTimestamp(JobStatus.CREATED), 
result.get("start-time").asLong());
-               
Assert.assertEquals(originalJob.getStatusTimestamp(originalJob.getState()), 
result.get("end-time").asLong());
-               Assert.assertEquals(
-                       originalJob.getStatusTimestamp(originalJob.getState()) 
- originalJob.getStatusTimestamp(JobStatus.CREATED),
-                       result.get("duration").asLong()
-               );
-
-               JsonNode timestamps = result.get("timestamps");
-               for (JobStatus status : JobStatus.values()) {
-                       
Assert.assertEquals(originalJob.getStatusTimestamp(status), 
timestamps.get(status.name()).asLong());
-               }
-
-               ArrayNode tasks = (ArrayNode) result.get("vertices");
-               int x = 0;
-               for (AccessExecutionJobVertex expectedTask : 
originalJob.getVerticesTopologically()) {
-                       JsonNode task = tasks.get(x);
-
-                       
Assert.assertEquals(expectedTask.getJobVertexId().toString(), 
task.get("id").asText());
-                       Assert.assertEquals(expectedTask.getName(), 
task.get("name").asText());
-                       Assert.assertEquals(expectedTask.getParallelism(), 
task.get("parallelism").asInt());
-                       
Assert.assertEquals(expectedTask.getAggregateState().name(), 
task.get("status").asText());
-
-                       Assert.assertEquals(3, task.get("start-time").asLong());
-                       Assert.assertEquals(5, task.get("end-time").asLong());
-                       Assert.assertEquals(2, task.get("duration").asLong());
-
-                       JsonNode subtasksPerState = task.get("tasks");
-                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.CREATED.name()).asInt());
-                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.SCHEDULED.name()).asInt());
-                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.DEPLOYING.name()).asInt());
-                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.RUNNING.name()).asInt());
-                       Assert.assertEquals(1, 
subtasksPerState.get(ExecutionState.FINISHED.name()).asInt());
-                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.CANCELING.name()).asInt());
-                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.CANCELED.name()).asInt());
-                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.FAILED.name()).asInt());
-
-                       long expectedNumBytesIn = 0;
-                       long expectedNumBytesOut = 0;
-                       long expectedNumRecordsIn = 0;
-                       long expectedNumRecordsOut = 0;
-
-                       for (AccessExecutionVertex vertex : 
expectedTask.getTaskVertices()) {
-                               IOMetrics ioMetrics = 
vertex.getCurrentExecutionAttempt().getIOMetrics();
-
-                               expectedNumBytesIn += 
ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
-                               expectedNumBytesOut += 
ioMetrics.getNumBytesOut();
-                               expectedNumRecordsIn += 
ioMetrics.getNumRecordsIn();
-                               expectedNumRecordsOut += 
ioMetrics.getNumRecordsOut();
-                       }
-
-                       JsonNode metrics = task.get("metrics");
-
-                       Assert.assertEquals(expectedNumBytesIn, 
metrics.get("read-bytes").asLong());
-                       Assert.assertEquals(expectedNumBytesOut, 
metrics.get("write-bytes").asLong());
-                       Assert.assertEquals(expectedNumRecordsIn, 
metrics.get("read-records").asLong());
-                       Assert.assertEquals(expectedNumRecordsOut, 
metrics.get("write-records").asLong());
-
-                       x++;
-               }
-               Assert.assertEquals(1, tasks.size());
-
-               JsonNode statusCounts = result.get("status-counts");
-               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.CREATED.name()).asInt());
-               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.SCHEDULED.name()).asInt());
-               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.DEPLOYING.name()).asInt());
-               Assert.assertEquals(1, 
statusCounts.get(ExecutionState.RUNNING.name()).asInt());
-               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.FINISHED.name()).asInt());
-               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.CANCELING.name()).asInt());
-               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.CANCELED.name()).asInt());
-               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.FAILED.name()).asInt());
-
-               
Assert.assertEquals(ArchivedJobGenerationUtils.MAPPER.readTree(originalJob.getJsonPlan()),
 result.get("plan"));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
deleted file mode 100644
index 6e0f918..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-import org.apache.flink.util.ExceptionUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the JobExceptionsHandler.
- */
-public class JobExceptionsHandlerTest {
-
-       @Test
-       public void testArchiver() throws Exception {
-               JsonArchivist archivist = new 
JobExceptionsHandler.JobExceptionsJsonArchivist();
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-
-               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
-               Assert.assertEquals(1, archives.size());
-
-               ArchivedJson archive = archives.iterator().next();
-               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/exceptions", archive.getPath());
-               compareExceptions(originalJob, archive.getJson());
-       }
-
-       @Test
-       public void testGetPaths() {
-               JobExceptionsHandler handler = new 
JobExceptionsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]);
-       }
-
-       @Test
-       public void testJsonGeneration() throws Exception {
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-               String json = 
JobExceptionsHandler.createJobExceptionsJson(originalJob);
-
-               compareExceptions(originalJob, json);
-       }
-
-       private static void compareExceptions(AccessExecutionGraph originalJob, 
String json) throws IOException {
-               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-               
Assert.assertEquals(originalJob.getFailureCause().getExceptionAsString(), 
result.get("root-exception").asText());
-               
Assert.assertEquals(originalJob.getFailureCause().getTimestamp(), 
result.get("timestamp").asLong());
-
-               ArrayNode exceptions = (ArrayNode) result.get("all-exceptions");
-
-               int x = 0;
-               for (AccessExecutionVertex expectedSubtask : 
originalJob.getAllExecutionVertices()) {
-                       if 
(!expectedSubtask.getFailureCauseAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION))
 {
-                               JsonNode exception = exceptions.get(x);
-
-                               
Assert.assertEquals(expectedSubtask.getFailureCauseAsString(), 
exception.get("exception").asText());
-                               
Assert.assertEquals(expectedSubtask.getStateTimestamp(ExecutionState.FAILED), 
exception.get("timestamp").asLong());
-                               
Assert.assertEquals(expectedSubtask.getTaskNameWithSubtaskIndex(), 
exception.get("task").asText());
-
-                               TaskManagerLocation location = 
expectedSubtask.getCurrentAssignedResourceLocation();
-                               String expectedLocationString = 
location.getFQDNHostname() + ':' + location.dataPort();
-                               Assert.assertEquals(expectedLocationString, 
exception.get("location").asText());
-                       }
-                       x++;
-               }
-               Assert.assertEquals(x > 
JobExceptionsHandler.MAX_NUMBER_EXCEPTION_TO_REPORT, 
result.get("truncated").asBoolean());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
deleted file mode 100644
index 94fd5a8..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for the JobManagerConfigHandler.
- */
-public class JobManagerConfigHandlerTest {
-       @Test
-       public void testGetPaths() {
-               JobManagerConfigHandler handler = new 
JobManagerConfigHandler(Executors.directExecutor(), null);
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               Assert.assertEquals("/jobmanager/config", paths[0]);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
deleted file mode 100644
index 4a934ec..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the JobPlanHandler.
- */
-public class JobPlanHandlerTest {
-
-       @Test
-       public void testArchiver() throws Exception {
-               JsonArchivist archivist = new 
JobPlanHandler.JobPlanJsonArchivist();
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-
-               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
-               Assert.assertEquals(1, archives.size());
-
-               ArchivedJson archive = archives.iterator().next();
-               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/plan", archive.getPath());
-               Assert.assertEquals(originalJob.getJsonPlan(), 
archive.getJson());
-       }
-
-       @Test
-       public void testGetPaths() {
-               JobPlanHandler handler = new 
JobPlanHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               Assert.assertEquals("/jobs/:jobid/plan", paths[0]);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
deleted file mode 100644
index 8c05c83..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.List;
-
-/**
- * Tests for the JobStoppingHandler.
- */
-public class JobStoppingHandlerTest extends TestLogger {
-       @Test
-       public void testGetPaths() {
-               JobStoppingHandler handler = new 
JobStoppingHandler(Executors.directExecutor(), TestingUtils.TIMEOUT());
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(2, paths.length);
-               List<String> pathsList = Lists.newArrayList(paths);
-               Assert.assertTrue(pathsList.contains("/jobs/:jobid/stop"));
-               Assert.assertTrue(pathsList.contains("/jobs/:jobid/yarn-stop"));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
deleted file mode 100644
index 5af9aa6..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the JobVertexAccumulatorsHandler.
- */
-public class JobVertexAccumulatorsHandlerTest {
-
-       @Test
-       public void testArchiver() throws Exception {
-               JsonArchivist archivist = new 
JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist();
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
-
-               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
-               Assert.assertEquals(1, archives.size());
-
-               ArchivedJson archive = archives.iterator().next();
-               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/vertices/" + originalTask.getJobVertexId() + "/accumulators", 
archive.getPath());
-               compareAccumulators(originalTask, archive.getJson());
-       }
-
-       @Test
-       public void testGetPaths() {
-               JobVertexAccumulatorsHandler handler = new 
JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]);
-       }
-
-       @Test
-       public void testJsonGeneration() throws Exception {
-               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
-               String json = 
JobVertexAccumulatorsHandler.createVertexAccumulatorsJson(originalTask);
-
-               compareAccumulators(originalTask, json);
-       }
-
-       private static void compareAccumulators(AccessExecutionJobVertex 
originalTask, String json) throws IOException {
-               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-               Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());
-
-               ArrayNode accs = (ArrayNode) result.get("user-accumulators");
-               StringifiedAccumulatorResult[] expectedAccs = 
originalTask.getAggregatedUserAccumulatorsStringified();
-
-               
ArchivedJobGenerationUtils.compareStringifiedAccumulators(expectedAccs, accs);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
deleted file mode 100644
index 0d15e08..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collections;
-
-import scala.Option;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for back pressure handler responses.
- */
-public class JobVertexBackPressureHandlerTest {
-       @Test
-       public void testGetPaths() {
-               JobVertexBackPressureHandler handler = new 
JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), mock(BackPressureStatsTracker.class), 0);
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/backpressure", paths[0]);
-       }
-
-       /** Tests the response when no stats are available. */
-       @Test
-       public void testResponseNoStatsAvailable() throws Exception {
-               ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-               BackPressureStatsTracker statsTracker = 
mock(BackPressureStatsTracker.class);
-
-               
when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
-                               
.thenReturn(Option.<OperatorBackPressureStats>empty());
-
-               JobVertexBackPressureHandler handler = new 
JobVertexBackPressureHandler(
-                               mock(ExecutionGraphHolder.class),
-                               Executors.directExecutor(),
-                               statsTracker,
-                               9999);
-
-               String response = handler.handleRequest(jobVertex, 
Collections.<String, String>emptyMap()).get();
-
-               ObjectMapper mapper = new ObjectMapper();
-               JsonNode rootNode = mapper.readTree(response);
-
-               // Single element
-               assertEquals(1, rootNode.size());
-
-               // Status
-               JsonNode status = rootNode.get("status");
-               assertNotNull(status);
-               assertEquals("deprecated", status.textValue());
-
-               
verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class));
-       }
-
-       /** Tests the response when stats are available. */
-       @Test
-       public void testResponseStatsAvailable() throws Exception {
-               ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-               BackPressureStatsTracker statsTracker = 
mock(BackPressureStatsTracker.class);
-
-               OperatorBackPressureStats stats = new OperatorBackPressureStats(
-                               0, System.currentTimeMillis(), new double[] { 
0.31, 0.48, 1.0, 0.0 });
-
-               
when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
-                               .thenReturn(Option.apply(stats));
-
-               JobVertexBackPressureHandler handler = new 
JobVertexBackPressureHandler(
-                               mock(ExecutionGraphHolder.class),
-                               Executors.directExecutor(),
-                               statsTracker,
-                               9999);
-
-               String response = handler.handleRequest(jobVertex, 
Collections.<String, String>emptyMap()).get();
-
-               ObjectMapper mapper = new ObjectMapper();
-               JsonNode rootNode = mapper.readTree(response);
-
-               // Single element
-               assertEquals(4, rootNode.size());
-
-               // Status
-               JsonNode status = rootNode.get("status");
-               assertNotNull(status);
-               assertEquals("ok", status.textValue());
-
-               // Back pressure level
-               JsonNode backPressureLevel = rootNode.get("backpressure-level");
-               assertNotNull(backPressureLevel);
-               assertEquals("high", backPressureLevel.textValue());
-
-               // End time stamp
-               JsonNode endTimeStamp = rootNode.get("end-timestamp");
-               assertNotNull(endTimeStamp);
-               assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue());
-
-               // Subtasks
-               JsonNode subTasks = rootNode.get("subtasks");
-               assertEquals(stats.getNumberOfSubTasks(), subTasks.size());
-               for (int i = 0; i < subTasks.size(); i++) {
-                       JsonNode subTask = subTasks.get(i);
-
-                       JsonNode index = subTask.get("subtask");
-                       assertEquals(i, index.intValue());
-
-                       JsonNode level = subTask.get("backpressure-level");
-                       assertEquals(JobVertexBackPressureHandler
-                                       
.getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue());
-
-                       JsonNode ratio = subTask.get("ratio");
-                       assertEquals(stats.getBackPressureRatio(i), 
ratio.doubleValue(), 0.0);
-               }
-
-               // Verify not triggered
-               verify(statsTracker, 
never()).triggerStackTraceSample(any(ExecutionJobVertex.class));
-       }
-
-       /** Tests that after the refresh interval another sample is triggered. 
*/
-       @Test
-       public void testResponsePassedRefreshInterval() throws Exception {
-               ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-               BackPressureStatsTracker statsTracker = 
mock(BackPressureStatsTracker.class);
-
-               OperatorBackPressureStats stats = new OperatorBackPressureStats(
-                               0, System.currentTimeMillis(), new double[] { 
0.31, 0.48, 1.0, 0.0 });
-
-               
when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
-                               .thenReturn(Option.apply(stats));
-
-               JobVertexBackPressureHandler handler = new 
JobVertexBackPressureHandler(
-                               mock(ExecutionGraphHolder.class),
-                               Executors.directExecutor(),
-                               statsTracker,
-                               0); // <----- refresh interval should fire 
immediately
-
-               String response = handler.handleRequest(jobVertex, 
Collections.<String, String>emptyMap()).get();
-
-               ObjectMapper mapper = new ObjectMapper();
-               JsonNode rootNode = mapper.readTree(response);
-
-               // Single element
-               assertEquals(4, rootNode.size());
-
-               // Status
-               JsonNode status = rootNode.get("status");
-               assertNotNull(status);
-               // Interval passed, hence deprecated
-               assertEquals("deprecated", status.textValue());
-
-               // Back pressure level
-               JsonNode backPressureLevel = rootNode.get("backpressure-level");
-               assertNotNull(backPressureLevel);
-               assertEquals("high", backPressureLevel.textValue());
-
-               // End time stamp
-               JsonNode endTimeStamp = rootNode.get("end-timestamp");
-               assertNotNull(endTimeStamp);
-               assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue());
-
-               // Subtasks
-               JsonNode subTasks = rootNode.get("subtasks");
-               assertEquals(stats.getNumberOfSubTasks(), subTasks.size());
-               for (int i = 0; i < subTasks.size(); i++) {
-                       JsonNode subTask = subTasks.get(i);
-
-                       JsonNode index = subTask.get("subtask");
-                       assertEquals(i, index.intValue());
-
-                       JsonNode level = subTask.get("backpressure-level");
-                       assertEquals(JobVertexBackPressureHandler
-                                       
.getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue());
-
-                       JsonNode ratio = subTask.get("ratio");
-                       assertEquals(stats.getBackPressureRatio(i), 
ratio.doubleValue(), 0.0);
-               }
-
-               // Verify triggered
-               
verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
deleted file mode 100644
index 1b8d9aa..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the JobVertexDetailsHandler.
- */
-public class JobVertexDetailsHandlerTest {
-
-       @Test
-       public void testArchiver() throws Exception {
-               JsonArchivist archivist = new 
JobVertexDetailsHandler.JobVertexDetailsJsonArchivist();
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
-
-               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
-               Assert.assertEquals(1, archives.size());
-
-               ArchivedJson archive = archives.iterator().next();
-               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/vertices/" + originalTask.getJobVertexId(), archive.getPath());
-               compareVertexDetails(originalTask, archive.getJson());
-       }
-
-       @Test
-       public void testGetPaths() {
-               JobVertexDetailsHandler handler = new 
JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), null);
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", 
paths[0]);
-       }
-
-       @Test
-       public void testJsonGeneration() throws Exception {
-               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
-               String json = JobVertexDetailsHandler.createVertexDetailsJson(
-                       originalTask, 
ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
-
-               compareVertexDetails(originalTask, json);
-       }
-
-       private static void compareVertexDetails(AccessExecutionJobVertex 
originalTask, String json) throws IOException {
-               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-               Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());
-               Assert.assertEquals(originalTask.getName(), 
result.get("name").asText());
-               Assert.assertEquals(originalTask.getParallelism(), 
result.get("parallelism").asInt());
-               Assert.assertTrue(result.get("now").asLong() > 0);
-
-               ArrayNode subtasks = (ArrayNode) result.get("subtasks");
-
-               Assert.assertEquals(originalTask.getTaskVertices().length, 
subtasks.size());
-               for (int x = 0; x < originalTask.getTaskVertices().length; x++) 
{
-                       AccessExecutionVertex expectedSubtask = 
originalTask.getTaskVertices()[x];
-                       JsonNode subtask = subtasks.get(x);
-
-                       Assert.assertEquals(x, subtask.get("subtask").asInt());
-                       
Assert.assertEquals(expectedSubtask.getExecutionState().name(), 
subtask.get("status").asText());
-                       
Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(),
 subtask.get("attempt").asInt());
-
-                       TaskManagerLocation location = 
expectedSubtask.getCurrentAssignedResourceLocation();
-                       String expectedLocationString = location.getHostname() 
+ ":" + location.dataPort();
-                       Assert.assertEquals(expectedLocationString, 
subtask.get("host").asText());
-                       long start = 
expectedSubtask.getStateTimestamp(ExecutionState.DEPLOYING);
-                       Assert.assertEquals(start, 
subtask.get("start-time").asLong());
-                       long end = 
expectedSubtask.getStateTimestamp(ExecutionState.FINISHED);
-                       Assert.assertEquals(end, 
subtask.get("end-time").asLong());
-                       Assert.assertEquals(end - start, 
subtask.get("duration").asLong());
-
-                       
ArchivedJobGenerationUtils.compareIoMetrics(expectedSubtask.getCurrentExecutionAttempt().getIOMetrics(),
 subtask.get("metrics"));
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
deleted file mode 100644
index badb952..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the JobVertexTaskManagersHandler.
- */
-public class JobVertexTaskManagersHandlerTest {
-
-       @Test
-       public void testArchiver() throws Exception {
-               JsonArchivist archivist = new 
JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist();
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
-               AccessExecutionVertex originalSubtask = 
ArchivedJobGenerationUtils.getTestSubtask();
-
-               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
-               Assert.assertEquals(1, archives.size());
-
-               ArchivedJson archive = archives.iterator().next();
-               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/vertices/" + originalTask.getJobVertexId() + "/taskmanagers", 
archive.getPath());
-               compareVertexTaskManagers(originalTask, originalSubtask, 
archive.getJson());
-       }
-
-       @Test
-       public void testGetPaths() {
-               JobVertexTaskManagersHandler handler = new 
JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), null);
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]);
-       }
-
-       @Test
-       public void testJsonGeneration() throws Exception {
-               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
-               AccessExecutionVertex originalSubtask = 
ArchivedJobGenerationUtils.getTestSubtask();
-               String json = 
JobVertexTaskManagersHandler.createVertexDetailsByTaskManagerJson(
-                       originalTask, 
ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
-
-               compareVertexTaskManagers(originalTask, originalSubtask, json);
-       }
-
-       private static void compareVertexTaskManagers(AccessExecutionJobVertex 
originalTask, AccessExecutionVertex originalSubtask, String json) throws 
IOException {
-               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-               Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());
-               Assert.assertEquals(originalTask.getName(), 
result.get("name").asText());
-               Assert.assertTrue(result.get("now").asLong() > 0);
-
-               ArrayNode taskmanagers = (ArrayNode) result.get("taskmanagers");
-
-               JsonNode taskManager = taskmanagers.get(0);
-
-               TaskManagerLocation location = 
originalSubtask.getCurrentAssignedResourceLocation();
-               String expectedLocationString = location.getHostname() + ':' + 
location.dataPort();
-               Assert.assertEquals(expectedLocationString, 
taskManager.get("host").asText());
-               Assert.assertEquals(ExecutionState.FINISHED.name(), 
taskManager.get("status").asText());
-
-               Assert.assertEquals(3, taskManager.get("start-time").asLong());
-               Assert.assertEquals(5, taskManager.get("end-time").asLong());
-               Assert.assertEquals(2, taskManager.get("duration").asLong());
-
-               JsonNode statusCounts = taskManager.get("status-counts");
-               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.CREATED.name()).asInt());
-               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.SCHEDULED.name()).asInt());
-               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.DEPLOYING.name()).asInt());
-               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.RUNNING.name()).asInt());
-               Assert.assertEquals(1, 
statusCounts.get(ExecutionState.FINISHED.name()).asInt());
-               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.CANCELING.name()).asInt());
-               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.CANCELED.name()).asInt());
-               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.FAILED.name()).asInt());
-
-               long expectedNumBytesIn = 0;
-               long expectedNumBytesOut = 0;
-               long expectedNumRecordsIn = 0;
-               long expectedNumRecordsOut = 0;
-
-               for (AccessExecutionVertex vertex : 
originalTask.getTaskVertices()) {
-                       IOMetrics ioMetrics = 
vertex.getCurrentExecutionAttempt().getIOMetrics();
-
-                       expectedNumBytesIn += ioMetrics.getNumBytesInLocal() + 
ioMetrics.getNumBytesInRemote();
-                       expectedNumBytesOut += ioMetrics.getNumBytesOut();
-                       expectedNumRecordsIn += ioMetrics.getNumRecordsIn();
-                       expectedNumRecordsOut += ioMetrics.getNumRecordsOut();
-               }
-
-               JsonNode metrics = taskManager.get("metrics");
-
-               Assert.assertEquals(expectedNumBytesIn, 
metrics.get("read-bytes").asLong());
-               Assert.assertEquals(expectedNumBytesOut, 
metrics.get("write-bytes").asLong());
-               Assert.assertEquals(expectedNumRecordsIn, 
metrics.get("read-records").asLong());
-               Assert.assertEquals(expectedNumRecordsOut, 
metrics.get("write-records").asLong());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
deleted file mode 100644
index a80bac9..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the SubtaskCurrentAttemptDetailsHandler.
- */
-public class SubtaskCurrentAttemptDetailsHandlerTest {
-       @Test
-       public void testGetPaths() {
-               SubtaskCurrentAttemptDetailsHandler handler = new 
SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), null);
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", 
paths[0]);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
deleted file mode 100644
index 6773fd4..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the SubtaskExecutionAttemptAccumulatorsHandler.
- */
-public class SubtaskExecutionAttemptAccumulatorsHandlerTest {
-
-       @Test
-       public void testArchiver() throws Exception {
-               JsonArchivist archivist = new 
SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist();
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
-               AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
-
-               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
-               Assert.assertEquals(1, archives.size());
-
-               ArchivedJson archive = archives.iterator().next();
-               Assert.assertEquals(
-                       "/jobs/" + originalJob.getJobID() +
-                       "/vertices/" + originalTask.getJobVertexId() +
-                       "/subtasks/" + 
originalAttempt.getParallelSubtaskIndex() +
-                       "/attempts/" + originalAttempt.getAttemptNumber() +
-                       "/accumulators",
-                       archive.getPath());
-               compareAttemptAccumulators(originalAttempt, archive.getJson());
-       }
-
-       @Test
-       public void testGetPaths() {
-               SubtaskExecutionAttemptAccumulatorsHandler handler = new 
SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators",
 paths[0]);
-       }
-
-       @Test
-       public void testJsonGeneration() throws Exception {
-               AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
-               String json = 
SubtaskExecutionAttemptAccumulatorsHandler.createAttemptAccumulatorsJson(originalAttempt);
-
-               compareAttemptAccumulators(originalAttempt, json);
-       }
-
-       private static void compareAttemptAccumulators(AccessExecution 
originalAttempt, String json) throws IOException {
-               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-               Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), 
result.get("subtask").asInt());
-               Assert.assertEquals(originalAttempt.getAttemptNumber(), 
result.get("attempt").asInt());
-               Assert.assertEquals(originalAttempt.getAttemptId().toString(), 
result.get("id").asText());
-
-               
ArchivedJobGenerationUtils.compareStringifiedAccumulators(originalAttempt.getUserAccumulatorsStringified(),
 (ArrayNode) result.get("user-accumulators"));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
deleted file mode 100644
index 7777d2d..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the SubtaskExecutionAttemptDetailsHandler.
- */
-public class SubtaskExecutionAttemptDetailsHandlerTest {
-
-       @Test
-       public void testArchiver() throws Exception {
-               JsonArchivist archivist = new 
SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist();
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
-               AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
-
-               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
-               Assert.assertEquals(2, archives.size());
-
-               Iterator<ArchivedJson> iterator = archives.iterator();
-               ArchivedJson archive1 = iterator.next();
-               Assert.assertEquals(
-                       "/jobs/" + originalJob.getJobID() +
-                               "/vertices/" + originalTask.getJobVertexId() +
-                               "/subtasks/" + 
originalAttempt.getParallelSubtaskIndex(),
-                       archive1.getPath());
-               compareAttemptDetails(originalAttempt, archive1.getJson());
-
-               ArchivedJson archive2 = iterator.next();
-               Assert.assertEquals(
-                       "/jobs/" + originalJob.getJobID() +
-                               "/vertices/" + originalTask.getJobVertexId() +
-                               "/subtasks/" + 
originalAttempt.getParallelSubtaskIndex() +
-                               "/attempts/" + 
originalAttempt.getAttemptNumber(),
-                       archive2.getPath());
-               compareAttemptDetails(originalAttempt, archive2.getJson());
-       }
-
-       @Test
-       public void testGetPaths() {
-               SubtaskExecutionAttemptDetailsHandler handler = new 
SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(),  null);
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt",
 paths[0]);
-       }
-
-       @Test
-       public void testJsonGeneration() throws Exception {
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
-               AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
-               String json = 
SubtaskExecutionAttemptDetailsHandler.createAttemptDetailsJson(
-                       originalAttempt, originalJob.getJobID().toString(), 
originalTask.getJobVertexId().toString(), null);
-
-               compareAttemptDetails(originalAttempt, json);
-       }
-
-       private static void compareAttemptDetails(AccessExecution 
originalAttempt, String json) throws IOException {
-               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-               Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), 
result.get("subtask").asInt());
-               Assert.assertEquals(originalAttempt.getState().name(), 
result.get("status").asText());
-               Assert.assertEquals(originalAttempt.getAttemptNumber(), 
result.get("attempt").asInt());
-               
Assert.assertEquals(originalAttempt.getAssignedResourceLocation().getHostname(),
 result.get("host").asText());
-               long start = 
originalAttempt.getStateTimestamp(ExecutionState.DEPLOYING);
-               Assert.assertEquals(start, result.get("start-time").asLong());
-               long end = 
originalAttempt.getStateTimestamp(ExecutionState.FINISHED);
-               Assert.assertEquals(end, result.get("end-time").asLong());
-               Assert.assertEquals(end - start, 
result.get("duration").asLong());
-
-               
ArchivedJobGenerationUtils.compareIoMetrics(originalAttempt.getIOMetrics(), 
result.get("metrics"));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
deleted file mode 100644
index 7b400da..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the SubtasksAllAccumulatorsHandler.
- */
-public class SubtasksAllAccumulatorsHandlerTest {
-
-       @Test
-       public void testArchiver() throws Exception {
-               JsonArchivist archivist = new 
SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist();
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
-
-               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
-               Assert.assertEquals(1, archives.size());
-
-               ArchivedJson archive = archives.iterator().next();
-               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/vertices/" + originalTask.getJobVertexId() +
-                       "/subtasks/accumulators", archive.getPath());
-               compareSubtaskAccumulators(originalTask, archive.getJson());
-       }
-
-       @Test
-       public void testGetPaths() {
-               SubtasksAllAccumulatorsHandler handler = new 
SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", 
paths[0]);
-       }
-
-       @Test
-       public void testJsonGeneration() throws Exception {
-               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
-               String json = 
SubtasksAllAccumulatorsHandler.createSubtasksAccumulatorsJson(originalTask);
-               compareSubtaskAccumulators(originalTask, json);
-       }
-
-       private static void compareSubtaskAccumulators(AccessExecutionJobVertex 
originalTask, String json) throws IOException {
-               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-               Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());
-               Assert.assertEquals(originalTask.getParallelism(), 
result.get("parallelism").asInt());
-
-               ArrayNode subtasks = (ArrayNode) result.get("subtasks");
-
-               Assert.assertEquals(originalTask.getTaskVertices().length, 
subtasks.size());
-               for (int x = 0; x < originalTask.getTaskVertices().length; x++) 
{
-                       JsonNode subtask = subtasks.get(x);
-                       AccessExecutionVertex expectedSubtask = 
originalTask.getTaskVertices()[x];
-
-                       Assert.assertEquals(x, subtask.get("subtask").asInt());
-                       
Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(),
 subtask.get("attempt").asInt());
-                       
Assert.assertEquals(expectedSubtask.getCurrentAssignedResourceLocation().getHostname(),
 subtask.get("host").asText());
-
-                       
ArchivedJobGenerationUtils.compareStringifiedAccumulators(
-                               
expectedSubtask.getCurrentExecutionAttempt().getUserAccumulatorsStringified(),
-                               (ArrayNode) subtask.get("user-accumulators"));
-               }
-       }
-}

Reply via email to