http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
deleted file mode 100644
index 9c5e168..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
-import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
-import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
-import org.apache.flink.runtime.checkpoint.TaskStateStats;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the CheckpointStatsSubtaskDetailsHandler.
- */
-public class CheckpointStatsSubtaskDetailsHandlerTest {
-
-       @Test
-       public void testArchiver() throws Exception {
-               JsonArchivist archivist = new 
CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist();
-               ObjectMapper mapper = new ObjectMapper();
-
-               PendingCheckpointStats checkpoint = 
mock(PendingCheckpointStats.class);
-               when(checkpoint.getCheckpointId()).thenReturn(1992139L);
-               
when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
-               when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack 
timestamp = duration
-
-               TaskStateStats task = createTaskStateStats(1237);
-               
when(checkpoint.getAllTaskStateStats()).thenReturn(Collections.singletonList(task));
-
-               CheckpointStatsHistory history = 
mock(CheckpointStatsHistory.class);
-               
when(history.getCheckpoints()).thenReturn(Collections.<AbstractCheckpointStats>singletonList(checkpoint));
-               CheckpointStatsSnapshot snapshot = 
mock(CheckpointStatsSnapshot.class);
-               when(snapshot.getHistory()).thenReturn(history);
-
-               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
-               when(graph.getJobID()).thenReturn(new JobID());
-
-               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(graph);
-               Assert.assertEquals(1, archives.size());
-
-               ArchivedJson archive = archives.iterator().next();
-               Assert.assertEquals(
-                       "/jobs/" + graph.getJobID() + "/checkpoints/details/" + 
checkpoint.getCheckpointId() + "/subtasks/" + task.getJobVertexId(),
-                       archive.getPath());
-               JsonNode rootNode = mapper.readTree(archive.getJson());
-               assertEquals(checkpoint.getCheckpointId(), 
rootNode.get("id").asLong());
-               assertEquals(checkpoint.getStatus().toString(), 
rootNode.get("status").asText());
-
-               verifyTaskNode(rootNode, task, 
checkpoint.getTriggerTimestamp());
-       }
-
-       @Test
-       public void testGetPaths() {
-               CheckpointStatsDetailsSubtasksHandler handler = new 
CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), new CheckpointStatsCache(0));
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               
Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid",
 paths[0]);
-       }
-
-       /**
-        * Tests a subtask details request.
-        */
-       @Test
-       public void testSubtaskRequest() throws Exception {
-               PendingCheckpointStats checkpoint = 
mock(PendingCheckpointStats.class);
-               when(checkpoint.getCheckpointId()).thenReturn(1992139L);
-               
when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
-               when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack 
timestamp = duration
-
-               TaskStateStats task = createTaskStateStats(1237);
-               
when(checkpoint.getTaskStateStats(any(JobVertexID.class))).thenReturn(task);
-
-               JsonNode rootNode = triggerRequest(checkpoint);
-               assertEquals(checkpoint.getCheckpointId(), 
rootNode.get("id").asLong());
-               assertEquals(checkpoint.getStatus().toString(), 
rootNode.get("status").asText());
-
-               verifyTaskNode(rootNode, task, 
checkpoint.getTriggerTimestamp());
-       }
-
-       /**
-        * Tests a subtask details request.
-        */
-       @Test
-       public void testSubtaskRequestNoSummary() throws Exception {
-               PendingCheckpointStats checkpoint = 
mock(PendingCheckpointStats.class);
-               when(checkpoint.getCheckpointId()).thenReturn(1992139L);
-               
when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
-               when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack 
timestamp = duration
-
-               TaskStateStats task = createTaskStateStats(0); // no 
acknowledged
-               
when(checkpoint.getTaskStateStats(any(JobVertexID.class))).thenReturn(task);
-
-               JsonNode rootNode = triggerRequest(checkpoint);
-               assertNull(rootNode.get("summary"));
-       }
-
-       /**
-        * Tests request with illegal checkpoint ID param.
-        */
-       @Test
-       public void testIllegalCheckpointId() throws Exception {
-               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               CheckpointStatsDetailsSubtasksHandler handler = new 
CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), new CheckpointStatsCache(0));
-               Map<String, String> params = new HashMap<>();
-               params.put("checkpointid", "illegal checkpoint");
-               String json = handler.handleRequest(graph, params).get();
-
-               assertEquals("{}", json);
-       }
-
-       /**
-        * Tests request with missing checkpoint ID param.
-        */
-       @Test
-       public void testNoCheckpointIdParam() throws Exception {
-               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               CheckpointStatsDetailsSubtasksHandler handler = new 
CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), new CheckpointStatsCache(0));
-               String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap()).get();
-
-               assertEquals("{}", json);
-       }
-
-       /**
-        * Test lookup of not existing checkpoint in history.
-        */
-       @Test
-       public void testCheckpointNotFound() throws Exception {
-               CheckpointStatsHistory history = 
mock(CheckpointStatsHistory.class);
-               when(history.getCheckpointById(anyLong())).thenReturn(null); // 
not found
-
-               CheckpointStatsSnapshot snapshot = 
mock(CheckpointStatsSnapshot.class);
-               when(snapshot.getHistory()).thenReturn(history);
-
-               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
-
-               CheckpointStatsDetailsSubtasksHandler handler = new 
CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), new CheckpointStatsCache(0));
-               Map<String, String> params = new HashMap<>();
-               params.put("checkpointid", "123");
-               params.put("vertexid", new JobVertexID().toString());
-               String json = handler.handleRequest(graph, params).get();
-
-               assertEquals("{}", json);
-               verify(history, times(1)).getCheckpointById(anyLong());
-       }
-
-       /**
-        * Tests request with illegal job vertex ID param.
-        */
-       @Test
-       public void testIllegalJobVertexIdParam() throws Exception {
-               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               CheckpointStatsDetailsSubtasksHandler handler = new 
CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), new CheckpointStatsCache(0));
-               Map<String, String> params = new HashMap<>();
-               params.put("checkpointid", "1");
-               params.put("vertexid", "illegal vertex id");
-               String json = handler.handleRequest(graph, params).get();
-
-               assertEquals("{}", json);
-       }
-
-       /**
-        * Tests request with missing job vertex ID param.
-        */
-       @Test
-       public void testNoJobVertexIdParam() throws Exception {
-               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               CheckpointStatsDetailsSubtasksHandler handler = new 
CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), new CheckpointStatsCache(0));
-               Map<String, String> params = new HashMap<>();
-               params.put("checkpointid", "1");
-               String json = handler.handleRequest(graph, params).get();
-
-               assertEquals("{}", json);
-       }
-
-       /**
-        * Test lookup of not existing job vertex ID in checkpoint.
-        */
-       @Test
-       public void testJobVertexNotFound() throws Exception {
-               PendingCheckpointStats inProgress = 
mock(PendingCheckpointStats.class);
-               
when(inProgress.getTaskStateStats(any(JobVertexID.class))).thenReturn(null); // 
not found
-               CheckpointStatsHistory history = 
mock(CheckpointStatsHistory.class);
-               
when(history.getCheckpointById(anyLong())).thenReturn(inProgress);
-
-               CheckpointStatsSnapshot snapshot = 
mock(CheckpointStatsSnapshot.class);
-               when(snapshot.getHistory()).thenReturn(history);
-
-               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
-
-               CheckpointStatsDetailsSubtasksHandler handler = new 
CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), new CheckpointStatsCache(0));
-               Map<String, String> params = new HashMap<>();
-               params.put("checkpointid", "123");
-               params.put("vertexid", new JobVertexID().toString());
-               String json = handler.handleRequest(graph, params).get();
-
-               assertEquals("{}", json);
-               verify(inProgress, 
times(1)).getTaskStateStats(any(JobVertexID.class));
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private static JsonNode triggerRequest(AbstractCheckpointStats 
checkpoint) throws Exception {
-               CheckpointStatsHistory history = 
mock(CheckpointStatsHistory.class);
-               
when(history.getCheckpointById(anyLong())).thenReturn(checkpoint);
-               CheckpointStatsSnapshot snapshot = 
mock(CheckpointStatsSnapshot.class);
-               when(snapshot.getHistory()).thenReturn(history);
-
-               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
-
-               CheckpointStatsDetailsSubtasksHandler handler = new 
CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), new CheckpointStatsCache(0));
-               Map<String, String> params = new HashMap<>();
-               params.put("checkpointid", "123");
-               params.put("vertexid", new JobVertexID().toString());
-               String json = handler.handleRequest(graph, params).get();
-
-               ObjectMapper mapper = new ObjectMapper();
-               return mapper.readTree(json);
-       }
-
-       private static TaskStateStats createTaskStateStats(int numAcknowledged) 
{
-               ThreadLocalRandom rand = ThreadLocalRandom.current();
-
-               TaskStateStats task = mock(TaskStateStats.class);
-               when(task.getJobVertexId()).thenReturn(new JobVertexID());
-               
when(task.getLatestAckTimestamp()).thenReturn(rand.nextLong(1024) + 1);
-               when(task.getStateSize()).thenReturn(rand.nextLong(1024) + 1);
-               
when(task.getEndToEndDuration(anyLong())).thenReturn(rand.nextLong(1024) + 1);
-               
when(task.getAlignmentBuffered()).thenReturn(rand.nextLong(1024) + 1);
-               when(task.getNumberOfSubtasks()).thenReturn(rand.nextInt(1024) 
+ 1);
-               
when(task.getNumberOfAcknowledgedSubtasks()).thenReturn(numAcknowledged);
-
-               TaskStateStats.TaskStateStatsSummary summary = 
mock(TaskStateStats.TaskStateStatsSummary.class);
-
-               
doReturn(createMinMaxAvgStats(rand)).when(summary).getStateSizeStats();
-               
doReturn(createMinMaxAvgStats(rand)).when(summary).getAckTimestampStats();
-               
doReturn(createMinMaxAvgStats(rand)).when(summary).getAlignmentBufferedStats();
-               
doReturn(createMinMaxAvgStats(rand)).when(summary).getAlignmentDurationStats();
-               
doReturn(createMinMaxAvgStats(rand)).when(summary).getSyncCheckpointDurationStats();
-               
doReturn(createMinMaxAvgStats(rand)).when(summary).getAsyncCheckpointDurationStats();
-
-               when(task.getSummaryStats()).thenReturn(summary);
-
-               SubtaskStateStats[] subtasks = new SubtaskStateStats[3];
-               subtasks[0] = createSubtaskStats(0, rand);
-               subtasks[1] = createSubtaskStats(1, rand);
-               subtasks[2] = null;
-
-               when(task.getSubtaskStats()).thenReturn(subtasks);
-
-               return task;
-       }
-
-       private static void verifyTaskNode(JsonNode taskNode, TaskStateStats 
task, long triggerTimestamp) {
-               long duration = ThreadLocalRandom.current().nextInt(128);
-
-               assertEquals(task.getLatestAckTimestamp(), 
taskNode.get("latest_ack_timestamp").asLong());
-               assertEquals(task.getStateSize(), 
taskNode.get("state_size").asLong());
-               
assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), 
taskNode.get("end_to_end_duration").asLong());
-               assertEquals(task.getAlignmentBuffered(), 
taskNode.get("alignment_buffered").asLong());
-               assertEquals(task.getNumberOfSubtasks(), 
taskNode.get("num_subtasks").asInt());
-               assertEquals(task.getNumberOfAcknowledgedSubtasks(), 
taskNode.get("num_acknowledged_subtasks").asInt());
-
-               TaskStateStats.TaskStateStatsSummary summary = 
task.getSummaryStats();
-               verifyMinMaxAvgStats(summary.getStateSizeStats(), 
taskNode.get("summary").get("state_size"));
-               verifyMinMaxAvgStats(summary.getSyncCheckpointDurationStats(), 
taskNode.get("summary").get("checkpoint_duration").get("sync"));
-               verifyMinMaxAvgStats(summary.getAsyncCheckpointDurationStats(), 
taskNode.get("summary").get("checkpoint_duration").get("async"));
-               verifyMinMaxAvgStats(summary.getAlignmentBufferedStats(), 
taskNode.get("summary").get("alignment").get("buffered"));
-               verifyMinMaxAvgStats(summary.getAlignmentDurationStats(), 
taskNode.get("summary").get("alignment").get("duration"));
-
-               JsonNode endToEndDurationNode = 
taskNode.get("summary").get("end_to_end_duration");
-               assertEquals(summary.getAckTimestampStats().getMinimum() - 
triggerTimestamp, endToEndDurationNode.get("min").asLong());
-               assertEquals(summary.getAckTimestampStats().getMaximum() - 
triggerTimestamp, endToEndDurationNode.get("max").asLong());
-               assertEquals((long) summary.getAckTimestampStats().getAverage() 
- triggerTimestamp, endToEndDurationNode.get("avg").asLong());
-
-               SubtaskStateStats[] subtasks = task.getSubtaskStats();
-               Iterator<JsonNode> it = taskNode.get("subtasks").iterator();
-
-               assertTrue(it.hasNext());
-               verifySubtaskStats(it.next(), 0, subtasks[0]);
-
-               assertTrue(it.hasNext());
-               verifySubtaskStats(it.next(), 1, subtasks[1]);
-
-               assertTrue(it.hasNext());
-               verifySubtaskStats(it.next(), 2, subtasks[2]);
-
-               assertFalse(it.hasNext());
-       }
-
-       private static SubtaskStateStats createSubtaskStats(int index, 
ThreadLocalRandom rand) {
-               SubtaskStateStats subtask = mock(SubtaskStateStats.class);
-               when(subtask.getSubtaskIndex()).thenReturn(index);
-               when(subtask.getAckTimestamp()).thenReturn(rand.nextLong(1024));
-               
when(subtask.getAlignmentBuffered()).thenReturn(rand.nextLong(1024));
-               
when(subtask.getAlignmentDuration()).thenReturn(rand.nextLong(1024));
-               
when(subtask.getSyncCheckpointDuration()).thenReturn(rand.nextLong(1024));
-               
when(subtask.getAsyncCheckpointDuration()).thenReturn(rand.nextLong(1024));
-               when(subtask.getAckTimestamp()).thenReturn(rand.nextLong(1024));
-               when(subtask.getStateSize()).thenReturn(rand.nextLong(1024));
-               
when(subtask.getEndToEndDuration(anyLong())).thenReturn(rand.nextLong(1024));
-               return subtask;
-       }
-
-       private static void verifySubtaskStats(JsonNode subtaskNode, int index, 
SubtaskStateStats subtask) {
-               if (subtask == null) {
-                       assertEquals(index, subtaskNode.get("index").asInt());
-                       assertEquals("pending_or_failed", 
subtaskNode.get("status").asText());
-               } else {
-                       assertEquals(subtask.getSubtaskIndex(), 
subtaskNode.get("index").asInt());
-                       assertEquals("completed", 
subtaskNode.get("status").asText());
-                       assertEquals(subtask.getAckTimestamp(), 
subtaskNode.get("ack_timestamp").asLong());
-                       assertEquals(subtask.getEndToEndDuration(0), 
subtaskNode.get("end_to_end_duration").asLong());
-                       assertEquals(subtask.getStateSize(), 
subtaskNode.get("state_size").asLong());
-                       assertEquals(subtask.getSyncCheckpointDuration(), 
subtaskNode.get("checkpoint").get("sync").asLong());
-                       assertEquals(subtask.getAsyncCheckpointDuration(), 
subtaskNode.get("checkpoint").get("async").asLong());
-                       assertEquals(subtask.getAlignmentBuffered(), 
subtaskNode.get("alignment").get("buffered").asLong());
-                       assertEquals(subtask.getAlignmentDuration(), 
subtaskNode.get("alignment").get("duration").asLong());
-               }
-       }
-
-       private static MinMaxAvgStats createMinMaxAvgStats(ThreadLocalRandom 
rand) {
-               MinMaxAvgStats mma = mock(MinMaxAvgStats.class);
-               when(mma.getMinimum()).thenReturn(rand.nextLong(1024));
-               when(mma.getMaximum()).thenReturn(rand.nextLong(1024));
-               when(mma.getAverage()).thenReturn(rand.nextLong(1024));
-
-               return mma;
-       }
-
-       private static void verifyMinMaxAvgStats(MinMaxAvgStats expected, 
JsonNode node) {
-               assertEquals(expected.getMinimum(), node.get("min").asLong());
-               assertEquals(expected.getMaximum(), node.get("max").asLong());
-               assertEquals(expected.getAverage(), node.get("avg").asLong());
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
index 2e52f2e..03666a8 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.webmonitor.history;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.history.FsJobArchivist;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 
 import org.junit.Assert;
 import org.junit.Rule;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 33d9c79..3c93be3 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.ArchiveMessages;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
deleted file mode 100644
index 0755888..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
-/**
- * Tests for the AbstractMetricsHandler.
- */
-public class AbstractMetricsHandlerTest extends TestLogger {
-       /**
-        * Verifies that the handlers correctly handle expected REST calls.
-        */
-       @Test
-       public void testHandleRequest() throws Exception {
-               MetricFetcher fetcher = new MetricFetcher(
-                       mock(GatewayRetriever.class),
-                       mock(MetricQueryServiceRetriever.class),
-                       Executors.directExecutor(),
-                       TestingUtils.TIMEOUT());
-               MetricStoreTest.setupStore(fetcher.getMetricStore());
-
-               JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
-
-               Map<String, String> pathParams = new HashMap<>();
-               Map<String, String> queryParams = new HashMap<>();
-
-               pathParams.put("jobid", "jobid");
-               pathParams.put("vertexid", "taskid");
-
-               // get list of available metrics
-               String availableList = handler.handleJsonRequest(pathParams, 
queryParams, null).get();
-
-               assertEquals("[" +
-                               "{\"id\":\"8.opname.abc.metric5\"}," +
-                               "{\"id\":\"8.abc.metric4\"}" +
-                               "]",
-                       availableList);
-
-               // get value for a single metric
-               queryParams.put("get", "8.opname.abc.metric5");
-
-               String metricValue = handler.handleJsonRequest(pathParams, 
queryParams, null).get();
-
-               assertEquals("[" +
-                               
"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}" +
-                               "]"
-                       , metricValue
-               );
-
-               // get values for multiple metrics
-               queryParams.put("get", "8.opname.abc.metric5,8.abc.metric4");
-
-               String metricValues = handler.handleJsonRequest(pathParams, 
queryParams, null).get();
-
-               assertEquals("[" +
-                               
"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}," +
-                               "{\"id\":\"8.abc.metric4\",\"value\":\"3\"}" +
-                               "]",
-                       metricValues
-               );
-       }
-
-       /**
-        * Verifies that a malformed request for available metrics does not 
throw an exception.
-        */
-       @Test
-       public void testInvalidListDoesNotFail() {
-               MetricFetcher fetcher = new MetricFetcher(
-                       mock(GatewayRetriever.class),
-                       mock(MetricQueryServiceRetriever.class),
-                       Executors.directExecutor(),
-                       TestingUtils.TIMEOUT());
-               MetricStoreTest.setupStore(fetcher.getMetricStore());
-
-               JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
-
-               Map<String, String> pathParams = new HashMap<>();
-               Map<String, String> queryParams = new HashMap<>();
-
-               pathParams.put("jobid", "jobid");
-               pathParams.put("vertexid", "taskid");
-
-               //-----invalid variable
-               pathParams.put("jobid", "nonexistent");
-
-               try {
-                       assertEquals("", handler.handleJsonRequest(pathParams, 
queryParams, null).get());
-               } catch (Exception e) {
-                       fail();
-               }
-       }
-
-       /**
-        * Verifies that a malformed request for a metric value does not throw 
an exception.
-        */
-       @Test
-       public void testInvalidGetDoesNotFail() {
-               MetricFetcher fetcher = new MetricFetcher(
-                       mock(GatewayRetriever.class),
-                       mock(MetricQueryServiceRetriever.class),
-                       Executors.directExecutor(),
-                       TestingUtils.TIMEOUT());
-               MetricStoreTest.setupStore(fetcher.getMetricStore());
-
-               JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
-
-               Map<String, String> pathParams = new HashMap<>();
-               Map<String, String> queryParams = new HashMap<>();
-
-               pathParams.put("jobid", "jobid");
-               pathParams.put("vertexid", "taskid");
-
-               //-----empty string
-               queryParams.put("get", "");
-
-               try {
-                       assertEquals("", handler.handleJsonRequest(pathParams, 
queryParams, null).get());
-               } catch (Exception e) {
-                       fail(e.getMessage());
-               }
-
-               //-----invalid variable
-               pathParams.put("jobid", "nonexistent");
-               queryParams.put("get", "subindex.opname.abc.metric5");
-
-               try {
-                       assertEquals("", handler.handleJsonRequest(pathParams, 
queryParams, null).get());
-               } catch (Exception e) {
-                       fail(e.getMessage());
-               }
-
-               //-----invalid metric
-               pathParams.put("jobid", "nonexistant");
-               queryParams.put("get", "subindex.opname.abc.nonexistant");
-
-               try {
-                       assertEquals("", handler.handleJsonRequest(pathParams, 
queryParams, null).get());
-               } catch (Exception e) {
-                       fail(e.getMessage());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
deleted file mode 100644
index 6d17b40..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
-/**
- * Tests for the JobManagerMetricsHandler.
- */
-public class JobManagerMetricsHandlerTest extends TestLogger {
-       @Test
-       public void testGetPaths() {
-               JobManagerMetricsHandler handler = new 
JobManagerMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               Assert.assertEquals("/jobmanager/metrics", paths[0]);
-       }
-
-       @Test
-       public void getMapFor() {
-               MetricFetcher fetcher = new MetricFetcher(
-                       mock(GatewayRetriever.class),
-                       mock(MetricQueryServiceRetriever.class),
-                       Executors.directExecutor(),
-                       TestingUtils.TIMEOUT());
-               MetricStore store = 
MetricStoreTest.setupStore(fetcher.getMetricStore());
-
-               JobManagerMetricsHandler handler = new 
JobManagerMetricsHandler(Executors.directExecutor(), fetcher);
-
-               Map<String, String> pathParams = new HashMap<>();
-
-               Map<String, String> metrics = handler.getMapFor(pathParams, 
store);
-
-               assertEquals("0", metrics.get("abc.metric1"));
-       }
-
-       @Test
-       public void getMapForNull() {
-               MetricFetcher fetcher = new MetricFetcher(
-                       mock(GatewayRetriever.class),
-                       mock(MetricQueryServiceRetriever.class),
-                       Executors.directExecutor(),
-                       TestingUtils.TIMEOUT());
-               MetricStore store = fetcher.getMetricStore();
-
-               JobManagerMetricsHandler handler = new 
JobManagerMetricsHandler(Executors.directExecutor(), fetcher);
-
-               Map<String, String> pathParams = new HashMap<>();
-
-               Map<String, String> metrics = handler.getMapFor(pathParams, 
store);
-
-               assertNotNull(metrics);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
deleted file mode 100644
index b26ceab..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static 
org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
-/**
- * Tests for the JobMetricsHandler.
- */
-public class JobMetricsHandlerTest extends TestLogger {
-       @Test
-       public void testGetPaths() {
-               JobMetricsHandler handler = new 
JobMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               Assert.assertEquals("/jobs/:jobid/metrics", paths[0]);
-       }
-
-       @Test
-       public void getMapFor() throws Exception {
-               MetricFetcher fetcher = new MetricFetcher(
-                       mock(GatewayRetriever.class),
-                       mock(MetricQueryServiceRetriever.class),
-                       Executors.directExecutor(),
-                       TestingUtils.TIMEOUT());
-               MetricStore store = 
MetricStoreTest.setupStore(fetcher.getMetricStore());
-
-               JobMetricsHandler handler = new 
JobMetricsHandler(Executors.directExecutor(), fetcher);
-
-               Map<String, String> pathParams = new HashMap<>();
-               pathParams.put(PARAMETER_JOB_ID, "jobid");
-
-               Map<String, String> metrics = handler.getMapFor(pathParams, 
store);
-
-               assertEquals("2", metrics.get("abc.metric3"));
-       }
-
-       @Test
-       public void getMapForNull() {
-               MetricFetcher fetcher = new MetricFetcher(
-                       mock(GatewayRetriever.class),
-                       mock(MetricQueryServiceRetriever.class),
-                       Executors.directExecutor(),
-                       TestingUtils.TIMEOUT());
-               MetricStore store = fetcher.getMetricStore();
-
-               JobMetricsHandler handler = new 
JobMetricsHandler(Executors.directExecutor(), fetcher);
-
-               Map<String, String> pathParams = new HashMap<>();
-
-               Map<String, String> metrics = handler.getMapFor(pathParams, 
store);
-
-               assertNull(metrics);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
deleted file mode 100644
index d637a4a..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static 
org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
-import static 
org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
-/**
- * Tests for the JobVertexMetricsHandler.
- */
-public class JobVertexMetricsHandlerTest extends TestLogger {
-       @Test
-       public void testGetPaths() {
-               JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/metrics", 
paths[0]);
-       }
-
-       @Test
-       public void getMapFor() throws Exception {
-               MetricFetcher fetcher = new MetricFetcher(
-                       mock(GatewayRetriever.class),
-                       mock(MetricQueryServiceRetriever.class),
-                       Executors.directExecutor(),
-                       TestingUtils.TIMEOUT());
-               MetricStore store = 
MetricStoreTest.setupStore(fetcher.getMetricStore());
-
-               JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
-
-               Map<String, String> pathParams = new HashMap<>();
-               pathParams.put(PARAMETER_JOB_ID, "jobid");
-               pathParams.put(PARAMETER_VERTEX_ID, "taskid");
-
-               Map<String, String> metrics = handler.getMapFor(pathParams, 
store);
-
-               assertEquals("3", metrics.get("8.abc.metric4"));
-
-               assertEquals("4", metrics.get("8.opname.abc.metric5"));
-       }
-
-       @Test
-       public void getMapForNull() {
-               MetricFetcher fetcher = new MetricFetcher(
-                       mock(GatewayRetriever.class),
-                       mock(MetricQueryServiceRetriever.class),
-                       Executors.directExecutor(),
-                       TestingUtils.TIMEOUT());
-               MetricStore store = fetcher.getMetricStore();
-
-               JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
-
-               Map<String, String> pathParams = new HashMap<>();
-
-               Map<String, String> metrics = handler.getMapFor(pathParams, 
store);
-
-               assertNull(metrics);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
deleted file mode 100644
index 09a0829..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
-import org.apache.flink.runtime.metrics.dump.MetricQueryService;
-import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
-import org.apache.flink.runtime.metrics.util.TestingHistogram;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
-import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.eq;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
-
-/**
- * Tests for the MetricFetcher.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(MetricFetcher.class)
-public class MetricFetcherTest extends TestLogger {
-       @Test
-       public void testUpdate() throws Exception {
-               final Time timeout = Time.seconds(10L);
-
-               // ========= setup TaskManager 
=================================================================================
-               JobID jobID = new JobID();
-               InstanceID tmID = new InstanceID();
-               ResourceID tmRID = new ResourceID(tmID.toString());
-               TaskManagerGateway taskManagerGateway = 
mock(TaskManagerGateway.class);
-               when(taskManagerGateway.getAddress()).thenReturn("/tm/address");
-
-               Instance taskManager = mock(Instance.class);
-               
when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
-               when(taskManager.getId()).thenReturn(tmID);
-               when(taskManager.getTaskManagerID()).thenReturn(tmRID);
-
-               // ========= setup JobManager 
==================================================================================
-               JobDetails details = mock(JobDetails.class);
-               when(details.getJobId()).thenReturn(jobID);
-
-               JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
-
-               when(jobManagerGateway.requestJobDetails(anyBoolean(), 
anyBoolean(), any(Time.class)))
-                       .thenReturn(CompletableFuture.completedFuture(new 
MultipleJobsDetails(new JobDetails[0], new JobDetails[0])));
-               
when(jobManagerGateway.requestTaskManagerInstances(any(Time.class)))
-                       
.thenReturn(CompletableFuture.completedFuture(Collections.singleton(taskManager)));
-               when(jobManagerGateway.getAddress()).thenReturn("/jm/address");
-
-               GatewayRetriever<JobManagerGateway> retriever = 
mock(AkkaJobManagerRetriever.class);
-               when(retriever.getNow())
-                       .thenReturn(Optional.of(jobManagerGateway));
-
-               // ========= setup QueryServices 
================================================================================
-               MetricQueryServiceGateway jmQueryService = 
mock(MetricQueryServiceGateway.class);
-               MetricQueryServiceGateway tmQueryService = 
mock(MetricQueryServiceGateway.class);
-
-               MetricDumpSerialization.MetricSerializationResult 
requestMetricsAnswer = createRequestDumpAnswer(tmID, jobID);
-
-               when(jmQueryService.queryMetrics(any(Time.class)))
-                       .thenReturn(CompletableFuture.completedFuture(new 
MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));
-               when(tmQueryService.queryMetrics(any(Time.class)))
-                       
.thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer));
-
-               MetricQueryServiceRetriever queryServiceRetriever = 
mock(MetricQueryServiceRetriever.class);
-               when(queryServiceRetriever.retrieveService(eq("/jm/" + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME))).thenReturn(CompletableFuture.completedFuture(jmQueryService));
-               when(queryServiceRetriever.retrieveService(eq("/tm/" + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + 
tmRID.getResourceIdString()))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
-
-               // ========= start MetricFetcher testing 
=======================================================================
-               MetricFetcher fetcher = new MetricFetcher(
-                       retriever,
-                       queryServiceRetriever,
-                       Executors.directExecutor(),
-                       timeout);
-
-               // verify that update fetches metrics and updates the store
-               fetcher.update();
-               MetricStore store = fetcher.getMetricStore();
-               synchronized (store) {
-                       assertEquals("7", 
store.jobManager.metrics.get("abc.hist_min"));
-                       assertEquals("6", 
store.jobManager.metrics.get("abc.hist_max"));
-                       assertEquals("4.0", 
store.jobManager.metrics.get("abc.hist_mean"));
-                       assertEquals("0.5", 
store.jobManager.metrics.get("abc.hist_median"));
-                       assertEquals("5.0", 
store.jobManager.metrics.get("abc.hist_stddev"));
-                       assertEquals("0.75", 
store.jobManager.metrics.get("abc.hist_p75"));
-                       assertEquals("0.9", 
store.jobManager.metrics.get("abc.hist_p90"));
-                       assertEquals("0.95", 
store.jobManager.metrics.get("abc.hist_p95"));
-                       assertEquals("0.98", 
store.jobManager.metrics.get("abc.hist_p98"));
-                       assertEquals("0.99", 
store.jobManager.metrics.get("abc.hist_p99"));
-                       assertEquals("0.999", 
store.jobManager.metrics.get("abc.hist_p999"));
-
-                       assertEquals("x", 
store.getTaskManagerMetricStore(tmID.toString()).metrics.get("abc.gauge"));
-                       assertEquals("5.0", 
store.getJobMetricStore(jobID.toString()).metrics.get("abc.jc"));
-                       assertEquals("2", 
store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.abc.tc"));
-                       assertEquals("1", 
store.getTaskMetricStore(jobID.toString(), 
"taskid").metrics.get("2.opname.abc.oc"));
-               }
-       }
-
-       private static MetricDumpSerialization.MetricSerializationResult 
createRequestDumpAnswer(InstanceID tmID, JobID jobID) {
-               Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new 
HashMap<>();
-               Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new 
HashMap<>();
-               Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new 
HashMap<>();
-               Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new 
HashMap<>();
-
-               SimpleCounter c1 = new SimpleCounter();
-               SimpleCounter c2 = new SimpleCounter();
-
-               c1.inc(1);
-               c2.inc(2);
-
-               counters.put(c1, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.OperatorQueryScopeInfo(jobID.toString(), "taskid", 2, "opname", 
"abc"), "oc"));
-               counters.put(c2, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.TaskQueryScopeInfo(jobID.toString(), "taskid", 2, "abc"), "tc"));
-               meters.put(new Meter() {
-                       @Override
-                       public void markEvent() {
-                       }
-
-                       @Override
-                       public void markEvent(long n) {
-                       }
-
-                       @Override
-                       public double getRate() {
-                               return 5;
-                       }
-
-                       @Override
-                       public long getCount() {
-                               return 10;
-                       }
-               }, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.JobQueryScopeInfo(jobID.toString(), "abc"), "jc"));
-               gauges.put(new Gauge<String>() {
-                       @Override
-                       public String getValue() {
-                               return "x";
-                       }
-               }, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.TaskManagerQueryScopeInfo(tmID.toString(), "abc"), "gauge"));
-               histograms.put(new TestingHistogram(), new 
Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist"));
-
-               MetricDumpSerialization.MetricDumpSerializer serializer = new 
MetricDumpSerialization.MetricDumpSerializer();
-               MetricDumpSerialization.MetricSerializationResult dump = 
serializer.serialize(counters, gauges, histograms, meters);
-               serializer.close();
-
-               return dump;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
deleted file mode 100644
index d19e8c6..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.metrics.dump.MetricDump;
-import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for the MetricStore.
- */
-public class MetricStoreTest extends TestLogger {
-       @Test
-       public void testAdd() throws IOException {
-               MetricStore store = setupStore(new MetricStore());
-
-               assertEquals("0", 
store.getJobManagerMetricStore().getMetric("abc.metric1", "-1"));
-               assertEquals("1", 
store.getTaskManagerMetricStore("tmid").getMetric("abc.metric2", "-1"));
-               assertEquals("2", 
store.getJobMetricStore("jobid").getMetric("abc.metric3", "-1"));
-               assertEquals("3", store.getTaskMetricStore("jobid", 
"taskid").getMetric("8.abc.metric4", "-1"));
-               assertEquals("4", store.getTaskMetricStore("jobid", 
"taskid").getMetric("8.opname.abc.metric5", "-1"));
-       }
-
-       @Test
-       public void testMalformedNameHandling() {
-               MetricStore store = new MetricStore();
-               //-----verify that no exceptions are thrown
-
-               // null
-               store.add(null);
-               // empty name
-               QueryScopeInfo.JobManagerQueryScopeInfo info = new 
QueryScopeInfo.JobManagerQueryScopeInfo("");
-               MetricDump.CounterDump cd = new MetricDump.CounterDump(info, 
"", 0);
-               store.add(cd);
-
-               //-----verify that no side effects occur
-               assertEquals(0, store.jobManager.metrics.size());
-               assertEquals(0, store.taskManagers.size());
-               assertEquals(0, store.jobs.size());
-       }
-
-       public static MetricStore setupStore(MetricStore store) {
-               QueryScopeInfo.JobManagerQueryScopeInfo jm = new 
QueryScopeInfo.JobManagerQueryScopeInfo("abc");
-               MetricDump.CounterDump cd1 = new MetricDump.CounterDump(jm, 
"metric1", 0);
-
-               QueryScopeInfo.TaskManagerQueryScopeInfo tm = new 
QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "abc");
-               MetricDump.CounterDump cd2 = new MetricDump.CounterDump(tm, 
"metric2", 1);
-
-               QueryScopeInfo.JobQueryScopeInfo job = new 
QueryScopeInfo.JobQueryScopeInfo("jobid", "abc");
-               MetricDump.CounterDump cd3 = new MetricDump.CounterDump(job, 
"metric3", 2);
-
-               QueryScopeInfo.TaskQueryScopeInfo task = new 
QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, "abc");
-               MetricDump.CounterDump cd4 = new MetricDump.CounterDump(task, 
"metric4", 3);
-
-               QueryScopeInfo.OperatorQueryScopeInfo operator = new 
QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, "opname", "abc");
-               MetricDump.CounterDump cd5 = new 
MetricDump.CounterDump(operator, "metric5", 4);
-
-               store.add(cd1);
-               store.add(cd2);
-               store.add(cd3);
-               store.add(cd4);
-               store.add(cd5);
-
-               return store;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
deleted file mode 100644
index 9c5549e..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static 
org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler.TASK_MANAGER_ID_KEY;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
-/**
- * Tests for the TaskManagerMetricsHandler.
- */
-public class TaskManagerMetricsHandlerTest extends TestLogger {
-       @Test
-       public void testGetPaths() {
-               TaskManagerMetricsHandler handler = new 
TaskManagerMetricsHandler(Executors.directExecutor(), 
mock(MetricFetcher.class));
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               Assert.assertEquals("/taskmanagers/:taskmanagerid/metrics", 
paths[0]);
-       }
-
-       @Test
-       public void getMapFor() throws Exception {
-               MetricFetcher fetcher = new MetricFetcher(
-                       mock(GatewayRetriever.class),
-                       mock(MetricQueryServiceRetriever.class),
-                       Executors.directExecutor(),
-                       TestingUtils.TIMEOUT());
-               MetricStore store = 
MetricStoreTest.setupStore(fetcher.getMetricStore());
-
-               TaskManagerMetricsHandler handler = new 
TaskManagerMetricsHandler(Executors.directExecutor(), fetcher);
-
-               Map<String, String> pathParams = new HashMap<>();
-               pathParams.put(TASK_MANAGER_ID_KEY, "tmid");
-
-               Map<String, String> metrics = handler.getMapFor(pathParams, 
store);
-
-               assertEquals("1", metrics.get("abc.metric2"));
-       }
-
-       @Test
-       public void getMapForNull() {
-               MetricFetcher fetcher = new MetricFetcher(
-                       mock(GatewayRetriever.class),
-                       mock(MetricQueryServiceRetriever.class),
-                       Executors.directExecutor(),
-                       TestingUtils.TIMEOUT());
-               MetricStore store = fetcher.getMetricStore();
-
-               TaskManagerMetricsHandler handler = new 
TaskManagerMetricsHandler(Executors.directExecutor(), fetcher);
-
-               Map<String, String> pathParams = new HashMap<>();
-
-               Map<String, String> metrics = handler.getMapFor(pathParams, 
store);
-
-               assertNull(metrics);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java
deleted file mode 100644
index 979d943..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.Preconditions;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-/**
- * Utility class for constructing an ArchivedExecution.
- */
-public class ArchivedExecutionBuilder {
-       private ExecutionAttemptID attemptId;
-       private long[] stateTimestamps;
-       private int attemptNumber;
-       private ExecutionState state;
-       private String failureCause;
-       private TaskManagerLocation assignedResourceLocation;
-       private StringifiedAccumulatorResult[] userAccumulators;
-       private IOMetrics ioMetrics;
-       private int parallelSubtaskIndex;
-
-       public ArchivedExecutionBuilder setAttemptId(ExecutionAttemptID 
attemptId) {
-               this.attemptId = attemptId;
-               return this;
-       }
-
-       public ArchivedExecutionBuilder setStateTimestamps(long[] 
stateTimestamps) {
-               Preconditions.checkArgument(stateTimestamps.length == 
ExecutionState.values().length);
-               this.stateTimestamps = stateTimestamps;
-               return this;
-       }
-
-       public ArchivedExecutionBuilder setAttemptNumber(int attemptNumber) {
-               this.attemptNumber = attemptNumber;
-               return this;
-       }
-
-       public ArchivedExecutionBuilder setState(ExecutionState state) {
-               this.state = state;
-               return this;
-       }
-
-       public ArchivedExecutionBuilder setFailureCause(String failureCause) {
-               this.failureCause = failureCause;
-               return this;
-       }
-
-       public ArchivedExecutionBuilder 
setAssignedResourceLocation(TaskManagerLocation assignedResourceLocation) {
-               this.assignedResourceLocation = assignedResourceLocation;
-               return this;
-       }
-
-       public ArchivedExecutionBuilder 
setUserAccumulators(StringifiedAccumulatorResult[] userAccumulators) {
-               this.userAccumulators = userAccumulators;
-               return this;
-       }
-
-       public ArchivedExecutionBuilder setParallelSubtaskIndex(int 
parallelSubtaskIndex) {
-               this.parallelSubtaskIndex = parallelSubtaskIndex;
-               return this;
-       }
-
-       public ArchivedExecutionBuilder setIOMetrics(IOMetrics ioMetrics) {
-               this.ioMetrics = ioMetrics;
-               return this;
-       }
-
-       public ArchivedExecution build() throws UnknownHostException {
-               return new ArchivedExecution(
-                       userAccumulators != null ? userAccumulators : new 
StringifiedAccumulatorResult[0],
-                       ioMetrics != null ? ioMetrics : new TestIOMetrics(),
-                       attemptId != null ? attemptId : new 
ExecutionAttemptID(),
-                       attemptNumber,
-                       state != null ? state : ExecutionState.FINISHED,
-                       failureCause != null ? failureCause : "(null)",
-                       assignedResourceLocation != null ? 
assignedResourceLocation : new TaskManagerLocation(new ResourceID("tm"), 
InetAddress.getLocalHost(), 1234),
-                       parallelSubtaskIndex,
-                       stateTimestamps != null ? stateTimestamps : new 
long[]{1, 2, 3, 4, 5, 5, 5, 5}
-               );
-       }
-
-       private static class TestIOMetrics extends IOMetrics {
-               private static final long serialVersionUID = 
-5920076211680012555L;
-
-               public TestIOMetrics() {
-                       super(
-                               new MeterView(new TestCounter(1), 0),
-                               new MeterView(new TestCounter(2), 0),
-                               new MeterView(new TestCounter(3), 0),
-                               new MeterView(new TestCounter(4), 0),
-                               new MeterView(new TestCounter(5), 0));
-               }
-       }
-
-       private static class TestCounter implements Counter {
-               private final long count;
-
-               private TestCounter(long count) {
-                       this.count = count;
-               }
-
-               @Override
-               public void inc() {
-               }
-
-               @Override
-               public void inc(long n) {
-               }
-
-               @Override
-               public void dec() {
-               }
-
-               @Override
-               public void dec(long n) {
-               }
-
-               @Override
-               public long getCount() {
-                       return count;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java
deleted file mode 100644
index 053f718..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.api.common.ArchivedExecutionConfig;
-import org.apache.flink.api.common.ExecutionMode;
-
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Utility class for constructing an ArchivedExecutionConfig.
- */
-public class ArchivedExecutionConfigBuilder {
-       private String executionMode;
-       private String restartStrategyDescription;
-       private int parallelism;
-       private boolean objectReuseEnabled;
-       private Map<String, String> globalJobParameters;
-
-       public ArchivedExecutionConfigBuilder setExecutionMode(String 
executionMode) {
-               this.executionMode = executionMode;
-               return this;
-       }
-
-       public ArchivedExecutionConfigBuilder 
setRestartStrategyDescription(String restartStrategyDescription) {
-               this.restartStrategyDescription = restartStrategyDescription;
-               return this;
-       }
-
-       public ArchivedExecutionConfigBuilder setParallelism(int parallelism) {
-               this.parallelism = parallelism;
-               return this;
-       }
-
-       public ArchivedExecutionConfigBuilder setObjectReuseEnabled(boolean 
objectReuseEnabled) {
-               this.objectReuseEnabled = objectReuseEnabled;
-               return this;
-       }
-
-       public ArchivedExecutionConfigBuilder 
setGlobalJobParameters(Map<String, String> globalJobParameters) {
-               this.globalJobParameters = globalJobParameters;
-               return this;
-       }
-
-       public ArchivedExecutionConfig build() {
-               return new ArchivedExecutionConfig(
-                       executionMode != null ? executionMode : 
ExecutionMode.PIPELINED.name(),
-                       restartStrategyDescription != null ? 
restartStrategyDescription : "default",
-                       parallelism,
-                       objectReuseEnabled,
-                       globalJobParameters != null ? globalJobParameters : 
Collections.<String, String>emptyMap()
-               );
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
deleted file mode 100644
index 57b300a..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.api.common.ArchivedExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ErrorInfo;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedValue;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-/**
- * Utility class for constructing an ArchivedExecutionGraph.
- */
-public class ArchivedExecutionGraphBuilder {
-
-       private static final Random RANDOM = new Random();
-
-       private JobID jobID;
-       private String jobName;
-       private Map<JobVertexID, ArchivedExecutionJobVertex> tasks;
-       private List<ArchivedExecutionJobVertex> verticesInCreationOrder;
-       private long[] stateTimestamps;
-       private JobStatus state;
-       private ErrorInfo failureCause;
-       private String jsonPlan;
-       private StringifiedAccumulatorResult[] archivedUserAccumulators;
-       private ArchivedExecutionConfig archivedExecutionConfig;
-       private boolean isStoppable;
-       private Map<String, SerializedValue<Object>> serializedUserAccumulators;
-
-       public ArchivedExecutionGraphBuilder setJobID(JobID jobID) {
-               this.jobID = jobID;
-               return this;
-       }
-
-       public ArchivedExecutionGraphBuilder setJobName(String jobName) {
-               this.jobName = jobName;
-               return this;
-       }
-
-       public ArchivedExecutionGraphBuilder setTasks(Map<JobVertexID, 
ArchivedExecutionJobVertex> tasks) {
-               this.tasks = tasks;
-               return this;
-       }
-
-       public ArchivedExecutionGraphBuilder 
setVerticesInCreationOrder(List<ArchivedExecutionJobVertex> 
verticesInCreationOrder) {
-               this.verticesInCreationOrder = verticesInCreationOrder;
-               return this;
-       }
-
-       public ArchivedExecutionGraphBuilder setStateTimestamps(long[] 
stateTimestamps) {
-               Preconditions.checkArgument(stateTimestamps.length == 
JobStatus.values().length);
-               this.stateTimestamps = stateTimestamps;
-               return this;
-       }
-
-       public ArchivedExecutionGraphBuilder setState(JobStatus state) {
-               this.state = state;
-               return this;
-       }
-
-       public ArchivedExecutionGraphBuilder setFailureCause(ErrorInfo 
failureCause) {
-               this.failureCause = failureCause;
-               return this;
-       }
-
-       public ArchivedExecutionGraphBuilder setJsonPlan(String jsonPlan) {
-               this.jsonPlan = jsonPlan;
-               return this;
-       }
-
-       public ArchivedExecutionGraphBuilder 
setArchivedUserAccumulators(StringifiedAccumulatorResult[] 
archivedUserAccumulators) {
-               this.archivedUserAccumulators = archivedUserAccumulators;
-               return this;
-       }
-
-       public ArchivedExecutionGraphBuilder 
setArchivedExecutionConfig(ArchivedExecutionConfig archivedExecutionConfig) {
-               this.archivedExecutionConfig = archivedExecutionConfig;
-               return this;
-       }
-
-       public ArchivedExecutionGraphBuilder setStoppable(boolean stoppable) {
-               isStoppable = stoppable;
-               return this;
-       }
-
-       public ArchivedExecutionGraphBuilder 
setSerializedUserAccumulators(Map<String, SerializedValue<Object>> 
serializedUserAccumulators) {
-               this.serializedUserAccumulators = serializedUserAccumulators;
-               return this;
-       }
-
-       public ArchivedExecutionGraph build() {
-               Preconditions.checkNotNull(tasks, "Tasks must not be null.");
-               JobID jobID = this.jobID != null ? this.jobID : new JobID();
-               String jobName = this.jobName != null ? this.jobName : "job_" + 
RANDOM.nextInt();
-               return new ArchivedExecutionGraph(
-                       jobID,
-                       jobName,
-                       tasks,
-                       verticesInCreationOrder != null ? 
verticesInCreationOrder : new ArrayList<>(tasks.values()),
-                       stateTimestamps != null ? stateTimestamps : new 
long[JobStatus.values().length],
-                       state != null ? state : JobStatus.FINISHED,
-                       failureCause,
-                       jsonPlan != null ? jsonPlan : "{\"jobid\":\"" + jobID + 
"\", \"name\":\"" + jobName + "\", \"nodes\":[]}",
-                       archivedUserAccumulators != null ? 
archivedUserAccumulators : new StringifiedAccumulatorResult[0],
-                       serializedUserAccumulators != null ? 
serializedUserAccumulators : Collections.<String, 
SerializedValue<Object>>emptyMap(),
-                       archivedExecutionConfig != null ? 
archivedExecutionConfig : new ArchivedExecutionConfigBuilder().build(),
-                       isStoppable,
-                       null,
-                       null
-               );
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java
deleted file mode 100644
index 3ef4106..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Random;
-
-/**
- * Utility class for constructing an ArchivedExecutionJobVertex.
- */
-public class ArchivedExecutionJobVertexBuilder {
-
-       private static final Random RANDOM = new Random();
-
-       private ArchivedExecutionVertex[] taskVertices;
-       private JobVertexID id;
-       private String name;
-       private int parallelism;
-       private int maxParallelism;
-       private StringifiedAccumulatorResult[] archivedUserAccumulators;
-
-       public ArchivedExecutionJobVertexBuilder 
setTaskVertices(ArchivedExecutionVertex[] taskVertices) {
-               this.taskVertices = taskVertices;
-               return this;
-       }
-
-       public ArchivedExecutionJobVertexBuilder setId(JobVertexID id) {
-               this.id = id;
-               return this;
-       }
-
-       public ArchivedExecutionJobVertexBuilder setName(String name) {
-               this.name = name;
-               return this;
-       }
-
-       public ArchivedExecutionJobVertexBuilder setParallelism(int 
parallelism) {
-               this.parallelism = parallelism;
-               return this;
-       }
-
-       public ArchivedExecutionJobVertexBuilder setMaxParallelism(int 
maxParallelism) {
-               this.maxParallelism = maxParallelism;
-               return this;
-       }
-
-       public ArchivedExecutionJobVertexBuilder 
setArchivedUserAccumulators(StringifiedAccumulatorResult[] 
archivedUserAccumulators) {
-               this.archivedUserAccumulators = archivedUserAccumulators;
-               return this;
-       }
-
-       public ArchivedExecutionJobVertex build() {
-               Preconditions.checkNotNull(taskVertices);
-               return new ArchivedExecutionJobVertex(
-                       taskVertices,
-                       id != null ? id : new JobVertexID(),
-                       name != null ? name : "task_" + RANDOM.nextInt(),
-                       parallelism,
-                       maxParallelism,
-                       archivedUserAccumulators != null ? 
archivedUserAccumulators : new StringifiedAccumulatorResult[0]
-               );
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java
deleted file mode 100644
index 67e9e11..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.runtime.executiongraph.ArchivedExecution;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
-import org.apache.flink.runtime.util.EvictingBoundedList;
-import org.apache.flink.util.Preconditions;
-
-import java.util.List;
-import java.util.Random;
-
-/**
- * Utility class for constructing an ArchivedExecutionVertex.
- */
-public class ArchivedExecutionVertexBuilder {
-
-       private static final Random RANDOM = new Random();
-
-       private int subtaskIndex;
-       private EvictingBoundedList<ArchivedExecution> priorExecutions;
-       private String taskNameWithSubtask;
-       private ArchivedExecution currentExecution;
-
-       public ArchivedExecutionVertexBuilder setSubtaskIndex(int subtaskIndex) 
{
-               this.subtaskIndex = subtaskIndex;
-               return this;
-       }
-
-       public ArchivedExecutionVertexBuilder 
setPriorExecutions(List<ArchivedExecution> priorExecutions) {
-               this.priorExecutions = new 
EvictingBoundedList<>(priorExecutions.size());
-               for (ArchivedExecution execution : priorExecutions) {
-                       this.priorExecutions.add(execution);
-               }
-               return this;
-       }
-
-       public ArchivedExecutionVertexBuilder setTaskNameWithSubtask(String 
taskNameWithSubtask) {
-               this.taskNameWithSubtask = taskNameWithSubtask;
-               return this;
-       }
-
-       public ArchivedExecutionVertexBuilder 
setCurrentExecution(ArchivedExecution currentExecution) {
-               this.currentExecution = currentExecution;
-               return this;
-       }
-
-       public ArchivedExecutionVertex build() {
-               Preconditions.checkNotNull(currentExecution);
-               return new ArchivedExecutionVertex(
-                       subtaskIndex,
-                       taskNameWithSubtask != null ? taskNameWithSubtask : 
"task_" + RANDOM.nextInt() + "_" + subtaskIndex,
-                       currentExecution,
-                       priorExecutions != null ? priorExecutions : new 
EvictingBoundedList<ArchivedExecution>(0)
-               );
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
deleted file mode 100644
index 3e4fc01..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ArchivedExecution;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ErrorInfo;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-
-import java.net.InetAddress;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Common entry-point for accessing generated ArchivedExecution* components.
- */
-public class ArchivedJobGenerationUtils {
-       public static final ObjectMapper MAPPER = new ObjectMapper();
-       public static final JsonFactory JACKSON_FACTORY = new JsonFactory()
-               .enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
-               .disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT);
-
-       private static ArchivedExecutionGraph originalJob;
-       private static ArchivedExecutionJobVertex originalTask;
-       private static ArchivedExecutionVertex originalSubtask;
-       private static ArchivedExecution originalAttempt;
-
-       private static final Object lock = new Object();
-
-       private ArchivedJobGenerationUtils() {
-       }
-
-       public static AccessExecutionGraph getTestJob() throws Exception {
-               synchronized (lock) {
-                       if (originalJob == null) {
-                               generateArchivedJob();
-                       }
-               }
-               return originalJob;
-       }
-
-       public static AccessExecutionJobVertex getTestTask() throws Exception {
-               synchronized (lock) {
-                       if (originalJob == null) {
-                               generateArchivedJob();
-                       }
-               }
-               return originalTask;
-       }
-
-       public static AccessExecutionVertex getTestSubtask() throws Exception {
-               synchronized (lock) {
-                       if (originalJob == null) {
-                               generateArchivedJob();
-                       }
-               }
-               return originalSubtask;
-       }
-
-       public static AccessExecution getTestAttempt() throws Exception {
-               synchronized (lock) {
-                       if (originalJob == null) {
-                               generateArchivedJob();
-                       }
-               }
-               return originalAttempt;
-       }
-
-       private static void generateArchivedJob() throws Exception {
-               // Attempt
-               StringifiedAccumulatorResult acc1 = new 
StringifiedAccumulatorResult("name1", "type1", "value1");
-               StringifiedAccumulatorResult acc2 = new 
StringifiedAccumulatorResult("name2", "type2", "value2");
-               TaskManagerLocation location = new TaskManagerLocation(new 
ResourceID("hello"), InetAddress.getLocalHost(), 1234);
-               originalAttempt = new ArchivedExecutionBuilder()
-                       .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 
9})
-                       .setParallelSubtaskIndex(1)
-                       .setAttemptNumber(0)
-                       .setAssignedResourceLocation(location)
-                       .setUserAccumulators(new 
StringifiedAccumulatorResult[]{acc1, acc2})
-                       .setState(ExecutionState.FINISHED)
-                       .setFailureCause("attemptException")
-                       .build();
-               // Subtask
-               originalSubtask = new ArchivedExecutionVertexBuilder()
-                       
.setSubtaskIndex(originalAttempt.getParallelSubtaskIndex())
-                       .setTaskNameWithSubtask("hello(1/1)")
-                       .setCurrentExecution(originalAttempt)
-                       .build();
-               // Task
-               originalTask = new ArchivedExecutionJobVertexBuilder()
-                       .setTaskVertices(new 
ArchivedExecutionVertex[]{originalSubtask})
-                       .build();
-               // Job
-               Map<JobVertexID, ArchivedExecutionJobVertex> tasks = new 
HashMap<>();
-               tasks.put(originalTask.getJobVertexId(), originalTask);
-               originalJob = new ArchivedExecutionGraphBuilder()
-                       .setJobID(new JobID())
-                       .setTasks(tasks)
-                       .setFailureCause(new ErrorInfo(new 
Exception("jobException"), 
originalAttempt.getStateTimestamp(ExecutionState.FAILED)))
-                       .setState(JobStatus.FINISHED)
-                       .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 
9, 10})
-                       .setArchivedUserAccumulators(new 
StringifiedAccumulatorResult[]{acc1, acc2})
-                       .build();
-       }
-
-       // 
========================================================================
-       // utility methods
-       // 
========================================================================
-
-       public static void 
compareStringifiedAccumulators(StringifiedAccumulatorResult[] expectedAccs, 
ArrayNode writtenAccs) {
-               assertEquals(expectedAccs.length, writtenAccs.size());
-               for (int x = 0; x < expectedAccs.length; x++) {
-                       JsonNode acc = writtenAccs.get(x);
-
-                       assertEquals(expectedAccs[x].getName(), 
acc.get("name").asText());
-                       assertEquals(expectedAccs[x].getType(), 
acc.get("type").asText());
-                       assertEquals(expectedAccs[x].getValue(), 
acc.get("value").asText());
-               }
-       }
-
-       public static void compareIoMetrics(IOMetrics expectedMetrics, JsonNode 
writtenMetrics) {
-               assertEquals(expectedMetrics.getNumBytesInTotal(), 
writtenMetrics.get("read-bytes").asLong());
-               assertEquals(expectedMetrics.getNumBytesOut(), 
writtenMetrics.get("write-bytes").asLong());
-               assertEquals(expectedMetrics.getNumRecordsIn(), 
writtenMetrics.get("read-records").asLong());
-               assertEquals(expectedMetrics.getNumRecordsOut(), 
writtenMetrics.get("write-records").asLong());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
new file mode 100644
index 0000000..4ad1759
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+/**
+ * A special exception that indicates that an element was not found and that 
the
+ * request should be answered with a {@code 404} return code.
+ */
+public class NotFoundException extends Exception {
+
+       private static final long serialVersionUID = -4036006746423754639L;
+
+       public NotFoundException(String message) {
+               super(message);
+       }
+}

Reply via email to