http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
deleted file mode 100644
index fde16fc..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
-
-import java.util.Map;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns for a given task manager a list of all 
available metrics or the values for a set of metrics.
- *
- * <p>If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
- * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
- *
- * <p>If the query parameters do contain a "get" parameter a comma-separate 
list of metric names is expected as a value.
- * {@code /get?X,Y}
- * The handler will then return a list containing the values of the requested 
metrics.
- * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
- */
-public class TaskManagerMetricsHandler extends AbstractMetricsHandler {
-
-       private static final String TASKMANAGER_METRICS_REST_PATH = 
"/taskmanagers/:taskmanagerid/metrics";
-
-       public TaskManagerMetricsHandler(Executor executor, MetricFetcher 
fetcher) {
-               super(executor, fetcher);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{TASKMANAGER_METRICS_REST_PATH};
-       }
-
-       @Override
-       protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
-               MetricStore.TaskManagerMetricStore taskManager = 
metrics.getTaskManagerMetricStore(pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY));
-               if (taskManager == null) {
-                       return null;
-               } else {
-                       return taskManager.metrics;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
deleted file mode 100644
index 9d71786..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.metrics.MetricNames;
-import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
-import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-
-/**
- * This class is a mutable version of the {@link IOMetrics} class that allows 
adding up IO-related metrics.
- *
- * <p>For finished jobs these metrics are stored in the {@link ExecutionGraph} 
as another {@link IOMetrics}.
- * For running jobs these metrics are retrieved using the {@link 
MetricFetcher}.
- *
- * <p>This class provides a common interface to handle both cases, reducing 
complexity in various handlers (like
- * the {@link JobVertexDetailsHandler}).
- */
-public class MutableIOMetrics extends IOMetrics {
-
-       private static final long serialVersionUID = -5460777634971381737L;
-
-       public MutableIOMetrics() {
-               super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D);
-       }
-
-       /**
-        * Adds the IO metrics for the given attempt to this object. If the 
{@link AccessExecution} is in
-        * a terminal state the contained {@link IOMetrics} object is added. 
Otherwise the given {@link MetricFetcher} is
-        * used to retrieve the required metrics.
-        *
-        * @param attempt Attempt whose IO metrics should be added
-        * @param fetcher MetricFetcher to retrieve metrics for running jobs
-        * @param jobID JobID to which the attempt belongs
-        * @param taskID TaskID to which the attempt belongs
-        */
-       public void addIOMetrics(AccessExecution attempt, @Nullable 
MetricFetcher fetcher, String jobID, String taskID) {
-               if (attempt.getState().isTerminal()) {
-                       IOMetrics ioMetrics = attempt.getIOMetrics();
-                       if (ioMetrics != null) { // execAttempt is already 
finished, use final metrics stored in ExecutionGraph
-                               this.numBytesInLocal += 
ioMetrics.getNumBytesInLocal();
-                               this.numBytesInRemote += 
ioMetrics.getNumBytesInRemote();
-                               this.numBytesOut += ioMetrics.getNumBytesOut();
-                               this.numRecordsIn += 
ioMetrics.getNumRecordsIn();
-                               this.numRecordsOut += 
ioMetrics.getNumRecordsOut();
-                       }
-               } else { // execAttempt is still running, use 
MetricQueryService instead
-                       if (fetcher != null) {
-                               fetcher.update();
-                               MetricStore.SubtaskMetricStore metrics = 
fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, 
attempt.getParallelSubtaskIndex());
-                               if (metrics != null) {
-                                       this.numBytesInLocal += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"));
-                                       this.numBytesInRemote += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
-                                       this.numBytesOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
-                                       this.numRecordsIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
-                                       this.numRecordsOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Writes the IO metrics contained in this object to the given {@link 
JsonGenerator}.
-        *
-        * <p>The JSON structure written is as follows:
-        * "metrics": {
-        *     "read-bytes": 1,
-        *     "write-bytes": 2,
-        *     "read-records": 3,
-        *     "write-records": 4
-        * }
-        *
-        * @param gen JsonGenerator to which the metrics should be written
-        * @throws IOException
-        */
-       public void writeIOMetricsAsJson(JsonGenerator gen) throws IOException {
-               gen.writeObjectFieldStart("metrics");
-               gen.writeNumberField("read-bytes", this.numBytesInLocal + 
this.numBytesInRemote);
-               gen.writeNumberField("write-bytes", this.numBytesOut);
-               gen.writeNumberField("read-records", this.numRecordsIn);
-               gen.writeNumberField("write-records", this.numRecordsOut);
-               gen.writeEndObject();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
deleted file mode 100644
index 0e4734d..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.client.JobClient;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import scala.Option;
-import scala.concurrent.duration.FiniteDuration;
-
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning;
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Simple back pressured task test.
- */
-public class BackPressureStatsTrackerITCase extends TestLogger {
-
-       private static NetworkBufferPool networkBufferPool;
-       private static ActorSystem testActorSystem;
-
-       /** Shared as static variable with the test task. */
-       private static BufferPool testBufferPool;
-
-       @BeforeClass
-       public static void setup() {
-               testActorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
-               networkBufferPool = new NetworkBufferPool(100, 8192, 
MemoryType.HEAP);
-       }
-
-       @AfterClass
-       public static void teardown() {
-               JavaTestKit.shutdownActorSystem(testActorSystem);
-               networkBufferPool.destroy();
-       }
-
-       /**
-        * Tests a simple fake-back pressured task. Back pressure is assumed 
when
-        * sampled stack traces are in blocking buffer requests.
-        */
-       @Test
-       public void testBackPressuredProducer() throws Exception {
-               new JavaTestKit(testActorSystem) {{
-                       final FiniteDuration deadline = new FiniteDuration(60, 
TimeUnit.SECONDS);
-
-                       // The JobGraph
-                       final JobGraph jobGraph = new JobGraph();
-                       final int parallelism = 4;
-
-                       final JobVertex task = new JobVertex("Task");
-                       task.setInvokableClass(BackPressuredTask.class);
-                       task.setParallelism(parallelism);
-
-                       jobGraph.addVertex(task);
-
-                       final Configuration config = new Configuration();
-
-                       final HighAvailabilityServices highAvailabilityServices 
= HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
-                               config,
-                               TestingUtils.defaultExecutor());
-
-                       ActorGateway jobManger = null;
-                       ActorGateway taskManager = null;
-
-                       //
-                       // 1) Consume all buffers at first (no buffers for the 
test task)
-                       //
-                       testBufferPool = networkBufferPool.createBufferPool(1, 
Integer.MAX_VALUE);
-                       final List<Buffer> buffers = new ArrayList<>();
-                       while (true) {
-                               Buffer buffer = testBufferPool.requestBuffer();
-                               if (buffer != null) {
-                                       buffers.add(buffer);
-                               } else {
-                                       break;
-                               }
-                       }
-
-                       try {
-                               jobManger = TestingUtils.createJobManager(
-                                       testActorSystem,
-                                       TestingUtils.defaultExecutor(),
-                                       TestingUtils.defaultExecutor(),
-                                       config,
-                                       highAvailabilityServices);
-
-                               
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
-
-                               taskManager = TestingUtils.createTaskManager(
-                                       testActorSystem,
-                                       highAvailabilityServices,
-                                       config,
-                                       true,
-                                       true);
-
-                               final ActorGateway jm = jobManger;
-
-                               new Within(deadline) {
-                                       @Override
-                                       protected void run() {
-                                               try {
-                                                       ActorGateway testActor 
= new AkkaActorGateway(getTestActor(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
-
-                                                       // Submit the job and 
wait until it is running
-                                                       
JobClient.submitJobDetached(
-                                                                       new 
AkkaJobManagerGateway(jm),
-                                                                       config,
-                                                                       
jobGraph,
-                                                                       
Time.milliseconds(deadline.toMillis()),
-                                                                       
ClassLoader.getSystemClassLoader());
-
-                                                       jm.tell(new 
WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor);
-
-                                                       expectMsgEquals(new 
AllVerticesRunning(jobGraph.getJobID()));
-
-                                                       // Get the 
ExecutionGraph
-                                                       jm.tell(new 
RequestExecutionGraph(jobGraph.getJobID()), testActor);
-
-                                                       ExecutionGraphFound 
executionGraphResponse =
-                                                                       
expectMsgClass(ExecutionGraphFound.class);
-
-                                                       ExecutionGraph 
executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph();
-                                                       ExecutionJobVertex 
vertex = executionGraph.getJobVertex(task.getID());
-
-                                                       
StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(
-                                                                       
testActorSystem.dispatcher(), 60000);
-
-                                                       // Verify back pressure 
(clean up interval can be ignored)
-                                                       
BackPressureStatsTracker statsTracker = new BackPressureStatsTracker(
-                                                               coordinator,
-                                                               100 * 1000,
-                                                               20,
-                                                               
Time.milliseconds(10L));
-
-                                                       int numAttempts = 10;
-
-                                                       int nextSampleId = 0;
-
-                                                       // Verify that all 
tasks are back pressured. This
-                                                       // can fail if the task 
takes longer to request
-                                                       // the buffer.
-                                                       for (int attempt = 0; 
attempt < numAttempts; attempt++) {
-                                                               try {
-                                                                       
OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex);
-
-                                                                       
assertEquals(nextSampleId + attempt, stats.getSampleId());
-                                                                       
assertEquals(parallelism, stats.getNumberOfSubTasks());
-                                                                       
assertEquals(1.0, stats.getMaxBackPressureRatio(), 0.0);
-
-                                                                       for 
(int i = 0; i < parallelism; i++) {
-                                                                               
assertEquals(1.0, stats.getBackPressureRatio(i), 0.0);
-                                                                       }
-
-                                                                       
nextSampleId = stats.getSampleId() + 1;
-
-                                                                       break;
-                                                               } catch 
(Throwable t) {
-                                                                       if 
(attempt == numAttempts - 1) {
-                                                                               
throw t;
-                                                                       } else {
-                                                                               
Thread.sleep(500);
-                                                                       }
-                                                               }
-                                                       }
-
-                                                       //
-                                                       // 2) Release all 
buffers and let the tasks grab one
-                                                       //
-                                                       for (Buffer buf : 
buffers) {
-                                                               buf.recycle();
-                                                       }
-
-                                                       // Wait for all buffers 
to be available. The tasks
-                                                       // grab them and then 
immediately release them.
-                                                       while 
(testBufferPool.getNumberOfAvailableMemorySegments() < 100) {
-                                                               
Thread.sleep(100);
-                                                       }
-
-                                                       // Verify that no task 
is back pressured any more.
-                                                       for (int attempt = 0; 
attempt < numAttempts; attempt++) {
-                                                               try {
-                                                                       
OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex);
-
-                                                                       
assertEquals(nextSampleId + attempt, stats.getSampleId());
-                                                                       
assertEquals(parallelism, stats.getNumberOfSubTasks());
-
-                                                                       // 
Verify that no task is back pressured
-                                                                       for 
(int i = 0; i < parallelism; i++) {
-                                                                               
assertEquals(0.0, stats.getBackPressureRatio(i), 0.0);
-                                                                       }
-
-                                                                       break;
-                                                               } catch 
(Throwable t) {
-                                                                       if 
(attempt == numAttempts - 1) {
-                                                                               
throw t;
-                                                                       } else {
-                                                                               
Thread.sleep(500);
-                                                                       }
-                                                               }
-                                                       }
-
-                                                       // Shut down
-                                                       jm.tell(new 
TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), testActor);
-
-                                                       // Cancel job
-                                                       jm.tell(new 
JobManagerMessages.CancelJob(jobGraph.getJobID()));
-
-                                                       // Response to removal 
notification
-                                                       expectMsgEquals(true);
-
-                                                       //
-                                                       // 3) Trigger stats for 
archived job
-                                                       //
-                                                       
statsTracker.invalidateOperatorStatsCache();
-                                                       assertFalse("Unexpected 
trigger", statsTracker.triggerStackTraceSample(vertex));
-
-                                               } catch (Exception e) {
-                                                       e.printStackTrace();
-                                                       fail(e.getMessage());
-                                               }
-                                       }
-                               };
-                       } finally {
-                               TestingUtils.stopActor(jobManger);
-                               TestingUtils.stopActor(taskManager);
-
-                               
highAvailabilityServices.closeAndCleanupAllData();
-
-                               for (Buffer buf : buffers) {
-                                       buf.recycle();
-                               }
-
-                               testBufferPool.lazyDestroy();
-                       }
-               }};
-       }
-
-       /**
-        * Triggers a new stats sample.
-        */
-       private OperatorBackPressureStats triggerStatsSample(
-                       BackPressureStatsTracker statsTracker,
-                       ExecutionJobVertex vertex) throws InterruptedException {
-
-               statsTracker.invalidateOperatorStatsCache();
-               assertTrue("Failed to trigger", 
statsTracker.triggerStackTraceSample(vertex));
-
-               // Sleep minimum duration
-               Thread.sleep(20 * 10);
-
-               Option<OperatorBackPressureStats> stats;
-
-               // Get the stats
-               while ((stats = 
statsTracker.getOperatorBackPressureStats(vertex)).isEmpty()) {
-                       Thread.sleep(10);
-               }
-
-               return stats.get();
-       }
-
-       /**
-        * A back pressured producer sharing a {@link BufferPool} with the
-        * test driver.
-        */
-       public static class BackPressuredTask extends AbstractInvokable {
-
-               @Override
-               public void invoke() throws Exception {
-                       while (true) {
-                               Buffer buffer = 
testBufferPool.requestBufferBlocking();
-                               // Got a buffer, yay!
-                               buffer.recycle();
-
-                               new CountDownLatch(1).await();
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
deleted file mode 100644
index e99d1b7..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the BackPressureStatsTracker.
- */
-public class BackPressureStatsTrackerTest extends TestLogger {
-
-       /** Tests simple statistics with fake stack traces. */
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testTriggerStackTraceSample() throws Exception {
-               CompletableFuture<StackTraceSample> sampleFuture = new 
CompletableFuture<>();
-
-               StackTraceSampleCoordinator sampleCoordinator = 
mock(StackTraceSampleCoordinator.class);
-               when(sampleCoordinator.triggerStackTraceSample(
-                               any(ExecutionVertex[].class),
-                               anyInt(),
-                               any(Time.class),
-                               anyInt())).thenReturn(sampleFuture);
-
-               ExecutionGraph graph = mock(ExecutionGraph.class);
-               when(graph.getState()).thenReturn(JobStatus.RUNNING);
-
-               // Same Thread execution context
-               when(graph.getFutureExecutor()).thenReturn(new Executor() {
-
-                       @Override
-                       public void execute(Runnable runnable) {
-                               runnable.run();
-                       }
-               });
-
-               ExecutionVertex[] taskVertices = new ExecutionVertex[4];
-
-               ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-               when(jobVertex.getJobId()).thenReturn(new JobID());
-               when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID());
-               when(jobVertex.getGraph()).thenReturn(graph);
-               when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
-
-               taskVertices[0] = mockExecutionVertex(jobVertex, 0);
-               taskVertices[1] = mockExecutionVertex(jobVertex, 1);
-               taskVertices[2] = mockExecutionVertex(jobVertex, 2);
-               taskVertices[3] = mockExecutionVertex(jobVertex, 3);
-
-               int numSamples = 100;
-               Time delayBetweenSamples = Time.milliseconds(100L);
-
-               BackPressureStatsTracker tracker = new BackPressureStatsTracker(
-                               sampleCoordinator, 9999, numSamples, 
delayBetweenSamples);
-
-               // Trigger
-               assertTrue("Failed to trigger", 
tracker.triggerStackTraceSample(jobVertex));
-
-               verify(sampleCoordinator).triggerStackTraceSample(
-                               eq(taskVertices),
-                               eq(numSamples),
-                               eq(delayBetweenSamples),
-                               
eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH));
-
-               // Trigger again for pending request, should not fire
-               assertFalse("Unexpected trigger", 
tracker.triggerStackTraceSample(jobVertex));
-
-               
assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isEmpty());
-
-               verify(sampleCoordinator).triggerStackTraceSample(
-                               eq(taskVertices),
-                               eq(numSamples),
-                               eq(delayBetweenSamples),
-                               
eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH));
-
-               
assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isEmpty());
-
-               // Complete the future
-               Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = new 
HashMap<>();
-               for (ExecutionVertex vertex : taskVertices) {
-                       List<StackTraceElement[]> taskTraces = new 
ArrayList<>();
-
-                       for (int i = 0; i < taskVertices.length; i++) {
-                               // Traces until sub task index are back 
pressured
-                               taskTraces.add(createStackTrace(i <= 
vertex.getParallelSubtaskIndex()));
-                       }
-
-                       
traces.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskTraces);
-               }
-
-               int sampleId = 1231;
-               int endTime = 841;
-
-               StackTraceSample sample = new StackTraceSample(
-                               sampleId,
-                               0,
-                               endTime,
-                               traces);
-
-               // Succeed the promise
-               sampleFuture.complete(sample);
-
-               
assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isDefined());
-
-               OperatorBackPressureStats stats = 
tracker.getOperatorBackPressureStats(jobVertex).get();
-
-               // Verify the stats
-               assertEquals(sampleId, stats.getSampleId());
-               assertEquals(endTime, stats.getEndTimestamp());
-               assertEquals(taskVertices.length, stats.getNumberOfSubTasks());
-
-               for (int i = 0; i < taskVertices.length; i++) {
-                       double ratio = stats.getBackPressureRatio(i);
-                       // Traces until sub task index are back pressured
-                       assertEquals((i + 1) / ((double) 4), ratio, 0.0);
-               }
-       }
-
-       private StackTraceElement[] createStackTrace(boolean isBackPressure) {
-               if (isBackPressure) {
-                       return new StackTraceElement[] { new StackTraceElement(
-                                       
BackPressureStatsTracker.EXPECTED_CLASS_NAME,
-                                       
BackPressureStatsTracker.EXPECTED_METHOD_NAME,
-                                       "LocalBufferPool.java",
-                                       133) };
-               } else {
-                       return Thread.currentThread().getStackTrace();
-               }
-       }
-
-       private ExecutionVertex mockExecutionVertex(
-                       ExecutionJobVertex jobVertex,
-                       int subTaskIndex) {
-
-               Execution exec = mock(Execution.class);
-               when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID());
-
-               JobVertexID id = jobVertex.getJobVertexId();
-
-               ExecutionVertex vertex = mock(ExecutionVertex.class);
-               when(vertex.getJobvertexId()).thenReturn(id);
-               when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
-               when(vertex.getParallelSubtaskIndex()).thenReturn(subTaskIndex);
-
-               return vertex;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
deleted file mode 100644
index bd12668..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.client.JobClient;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning;
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
-import static org.junit.Assert.fail;
-
-/**
- * Simple stack trace sampling test.
- */
-public class StackTraceSampleCoordinatorITCase extends TestLogger {
-
-       private static ActorSystem testActorSystem;
-
-       @BeforeClass
-       public static void setup() {
-               testActorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
-       }
-
-       @AfterClass
-       public static void teardown() {
-               JavaTestKit.shutdownActorSystem(testActorSystem);
-       }
-
-       /**
-        * Tests that a cleared task is answered with a partial success 
response.
-        */
-       @Test
-       public void testTaskClearedWhileSampling() throws Exception {
-               new JavaTestKit(testActorSystem) {{
-                       final FiniteDuration deadline = new FiniteDuration(60, 
TimeUnit.SECONDS);
-
-                       // The JobGraph
-                       final JobGraph jobGraph = new JobGraph();
-                       final int parallelism = 1;
-
-                       final JobVertex task = new JobVertex("Task");
-                       task.setInvokableClass(BlockingNoOpInvokable.class);
-                       task.setParallelism(parallelism);
-
-                       jobGraph.addVertex(task);
-
-                       final Configuration config = new Configuration();
-
-                       final HighAvailabilityServices highAvailabilityServices 
= HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
-                               config,
-                               TestingUtils.defaultExecutor());
-
-                       ActorGateway jobManger = null;
-                       ActorGateway taskManager = null;
-
-                       try {
-                               jobManger = TestingUtils.createJobManager(
-                                       testActorSystem,
-                                       TestingUtils.defaultExecutor(),
-                                       TestingUtils.defaultExecutor(),
-                                       config,
-                                       highAvailabilityServices);
-
-                               
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
-
-                               taskManager = TestingUtils.createTaskManager(
-                                       testActorSystem,
-                                       highAvailabilityServices,
-                                       config,
-                                       true,
-                                       true);
-
-                               final ActorGateway jm = jobManger;
-
-                               new Within(deadline) {
-                                       @Override
-                                       protected void run() {
-                                               try {
-                                                       ActorGateway testActor 
= new AkkaActorGateway(getTestActor(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
-
-                                                       int maxAttempts = 10;
-                                                       int sleepTime = 100;
-
-                                                       for (int i = 0; i < 
maxAttempts; i++, sleepTime *= 2) {
-                                                               // Submit the 
job and wait until it is running
-                                                               
JobClient.submitJobDetached(
-                                                                               
new AkkaJobManagerGateway(jm),
-                                                                               
config,
-                                                                               
jobGraph,
-                                                                               
Time.milliseconds(deadline.toMillis()),
-                                                                               
ClassLoader.getSystemClassLoader());
-
-                                                               jm.tell(new 
WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor);
-
-                                                               
expectMsgEquals(new AllVerticesRunning(jobGraph.getJobID()));
-
-                                                               // Get the 
ExecutionGraph
-                                                               jm.tell(new 
RequestExecutionGraph(jobGraph.getJobID()), testActor);
-                                                               
ExecutionGraphFound executionGraphResponse =
-                                                                               
expectMsgClass(ExecutionGraphFound.class);
-                                                               ExecutionGraph 
executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph();
-                                                               
ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID());
-
-                                                               
StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(
-                                                                               
testActorSystem.dispatcher(), 60000);
-
-                                                               
CompletableFuture<StackTraceSample> sampleFuture = 
coordinator.triggerStackTraceSample(
-                                                                       
vertex.getTaskVertices(),
-                                                                       // Do 
this often so we have a good
-                                                                       // 
chance of removing the job during
-                                                                       // 
sampling.
-                                                                       
21474700 * 100,
-                                                                       
Time.milliseconds(10L),
-                                                                       0);
-
-                                                               // Wait before 
cancelling so that some samples
-                                                               // are actually 
taken.
-                                                               
Thread.sleep(sleepTime);
-
-                                                               // Cancel job
-                                                               Future<?> 
removeFuture = jm.ask(
-                                                                               
new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()),
-                                                                               
remaining());
-
-                                                               jm.tell(new 
JobManagerMessages.CancelJob(jobGraph.getJobID()));
-
-                                                               try {
-                                                                       // 
Throws Exception on failure
-                                                                       
sampleFuture.get(remaining().toMillis(), TimeUnit.MILLISECONDS);
-
-                                                                       // OK, 
we are done. Got the expected
-                                                                       // 
partial result.
-                                                                       break;
-                                                               } catch 
(Throwable t) {
-                                                                       // We 
were too fast in cancelling the job.
-                                                                       // Fall 
through and retry.
-                                                               } finally {
-                                                                       
Await.ready(removeFuture, remaining());
-                                                               }
-                                                       }
-                                               } catch (Exception e) {
-                                                       e.printStackTrace();
-                                                       fail(e.getMessage());
-                                               }
-                                       }
-                               };
-                       } finally {
-                               TestingUtils.stopActor(jobManger);
-                               TestingUtils.stopActor(taskManager);
-
-                               
highAvailabilityServices.closeAndCleanupAllData();
-                       }
-               }};
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
deleted file mode 100644
index 08c4212..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
+++ /dev/null
@@ -1,441 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
-import org.apache.flink.runtime.messages.StackTraceSampleResponse;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorSystem;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Test for the {@link StackTraceSampleCoordinator}.
- */
-public class StackTraceSampleCoordinatorTest extends TestLogger {
-
-       private static ActorSystem system;
-
-       private StackTraceSampleCoordinator coord;
-
-       @BeforeClass
-       public static void setUp() throws Exception {
-               system = AkkaUtils.createLocalActorSystem(new Configuration());
-       }
-
-       @AfterClass
-       public static void tearDown() throws Exception {
-               if (system != null) {
-                       system.shutdown();
-               }
-       }
-
-       @Before
-       public void init() throws Exception {
-               this.coord = new 
StackTraceSampleCoordinator(system.dispatcher(), 60000);
-       }
-
-       /** Tests simple trigger and collect of stack trace samples. */
-       @Test
-       public void testTriggerStackTraceSample() throws Exception {
-               ExecutionVertex[] vertices = new ExecutionVertex[] {
-                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
-                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
-                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
-                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true)
-               };
-
-               int numSamples = 1;
-               Time delayBetweenSamples = Time.milliseconds(100L);
-               int maxStackTraceDepth = 0;
-
-               CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
-                               vertices, numSamples, delayBetweenSamples, 
maxStackTraceDepth);
-
-               // Verify messages have been sent
-               for (ExecutionVertex vertex : vertices) {
-                       ExecutionAttemptID expectedExecutionId = vertex
-                                       
.getCurrentExecutionAttempt().getAttemptId();
-
-                       TriggerStackTraceSample expectedMsg = new 
TriggerStackTraceSample(
-                                       0,
-                                       expectedExecutionId,
-                                       numSamples,
-                                       delayBetweenSamples,
-                                       maxStackTraceDepth);
-
-                       verify(vertex.getCurrentExecutionAttempt())
-                               .requestStackTraceSample(eq(0), eq(numSamples), 
eq(delayBetweenSamples), eq(maxStackTraceDepth), any(Time.class));
-               }
-
-               assertFalse(sampleFuture.isDone());
-
-               StackTraceElement[] stackTraceSample = 
Thread.currentThread().getStackTrace();
-               List<StackTraceElement[]> traces = new ArrayList<>();
-               traces.add(stackTraceSample);
-               traces.add(stackTraceSample);
-               traces.add(stackTraceSample);
-
-               // Collect stack traces
-               for (int i = 0; i < vertices.length; i++) {
-                       ExecutionAttemptID executionId = 
vertices[i].getCurrentExecutionAttempt().getAttemptId();
-                       coord.collectStackTraces(0, executionId, traces);
-
-                       if (i == vertices.length - 1) {
-                               assertTrue(sampleFuture.isDone());
-                       } else {
-                               assertFalse(sampleFuture.isDone());
-                       }
-               }
-
-               // Verify completed stack trace sample
-               StackTraceSample sample = sampleFuture.get();
-
-               assertEquals(0, sample.getSampleId());
-               assertTrue(sample.getEndTime() >= sample.getStartTime());
-
-               Map<ExecutionAttemptID, List<StackTraceElement[]>> tracesByTask 
= sample.getStackTraces();
-
-               for (ExecutionVertex vertex : vertices) {
-                       ExecutionAttemptID executionId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
-                       List<StackTraceElement[]> sampleTraces = 
tracesByTask.get(executionId);
-
-                       assertNotNull("Task not found", sampleTraces);
-                       assertTrue(traces.equals(sampleTraces));
-               }
-
-               // Verify no more pending sample
-               assertEquals(0, coord.getNumberOfPendingSamples());
-
-               // Verify no error on late collect
-               coord.collectStackTraces(0, 
vertices[0].getCurrentExecutionAttempt().getAttemptId(), traces);
-       }
-
-       /** Tests triggering for non-running tasks fails the future. */
-       @Test
-       public void testTriggerStackTraceSampleNotRunningTasks() throws 
Exception {
-               ExecutionVertex[] vertices = new ExecutionVertex[] {
-                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
-                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.DEPLOYING, true)
-               };
-
-               CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
-                       vertices,
-                       1,
-                       Time.milliseconds(100L),
-                       0);
-
-               assertTrue(sampleFuture.isDone());
-
-               try {
-                       sampleFuture.get();
-                       fail("Expected exception.");
-               } catch (ExecutionException e) {
-                       assertTrue(e.getCause() instanceof 
IllegalStateException);
-               }
-       }
-
-       /** Tests triggering for reset tasks fails the future. */
-       @Test(timeout = 1000L)
-       public void testTriggerStackTraceSampleResetRunningTasks() throws 
Exception {
-               ExecutionVertex[] vertices = new ExecutionVertex[] {
-                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
-                               // Fails to send the message to the execution 
(happens when execution is reset)
-                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, false)
-               };
-
-               CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
-                       vertices,
-                       1,
-                       Time.milliseconds(100L),
-                       0);
-
-               try {
-                       sampleFuture.get();
-                       fail("Expected exception.");
-               } catch (ExecutionException e) {
-                       assertTrue(e.getCause() instanceof RuntimeException);
-               }
-       }
-
-       /** Tests that samples time out if they don't finish in time. */
-       @Test(timeout = 1000L)
-       public void testTriggerStackTraceSampleTimeout() throws Exception {
-               int timeout = 100;
-
-               coord = new StackTraceSampleCoordinator(system.dispatcher(), 
timeout);
-
-               final ScheduledExecutorService scheduledExecutorService = new 
ScheduledThreadPoolExecutor(1);
-
-               try {
-
-                       ExecutionVertex[] vertices = new ExecutionVertex[]{
-                               mockExecutionVertexWithTimeout(
-                                       new ExecutionAttemptID(),
-                                       ExecutionState.RUNNING,
-                                       scheduledExecutorService,
-                                       timeout)
-                       };
-
-                       CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
-                               vertices, 1, Time.milliseconds(100L), 0);
-
-                       // Wait for the timeout
-                       Thread.sleep(timeout * 2);
-
-                       boolean success = false;
-                       for (int i = 0; i < 10; i++) {
-                               if (sampleFuture.isDone()) {
-                                       success = true;
-                                       break;
-                               }
-
-                               Thread.sleep(timeout);
-                       }
-
-                       assertTrue("Sample did not time out", success);
-
-                       try {
-                               sampleFuture.get();
-                               fail("Expected exception.");
-                       } catch (ExecutionException e) {
-                               
assertTrue(e.getCause().getCause().getMessage().contains("Timeout"));
-                       }
-
-                       // Collect after the timeout (should be ignored)
-                       ExecutionAttemptID executionId = 
vertices[0].getCurrentExecutionAttempt().getAttemptId();
-                       coord.collectStackTraces(0, executionId, new 
ArrayList<StackTraceElement[]>());
-               } finally {
-                       scheduledExecutorService.shutdownNow();
-               }
-       }
-
-       /** Tests that collecting an unknown sample is ignored. */
-       @Test
-       public void testCollectStackTraceForUnknownSample() throws Exception {
-               coord.collectStackTraces(0, new ExecutionAttemptID(), new 
ArrayList<StackTraceElement[]>());
-       }
-
-       /** Tests cancelling of a pending sample. */
-       @Test
-       public void testCancelStackTraceSample() throws Exception {
-               ExecutionVertex[] vertices = new ExecutionVertex[] {
-                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
-               };
-
-               CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
-                               vertices, 1, Time.milliseconds(100L), 0);
-
-               assertFalse(sampleFuture.isDone());
-
-               // Cancel
-               coord.cancelStackTraceSample(0, null);
-
-               // Verify completed
-               assertTrue(sampleFuture.isDone());
-
-               // Verify no more pending samples
-               assertEquals(0, coord.getNumberOfPendingSamples());
-       }
-
-       /** Tests that collecting for a cancelled sample throws no Exception. */
-       @Test
-       public void testCollectStackTraceForCanceledSample() throws Exception {
-               ExecutionVertex[] vertices = new ExecutionVertex[] {
-                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
-               };
-
-               CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
-                               vertices, 1, Time.milliseconds(100L), 0);
-
-               assertFalse(sampleFuture.isDone());
-
-               coord.cancelStackTraceSample(0, null);
-
-               assertTrue(sampleFuture.isDone());
-
-               // Verify no error on late collect
-               ExecutionAttemptID executionId = 
vertices[0].getCurrentExecutionAttempt().getAttemptId();
-               coord.collectStackTraces(0, executionId, new 
ArrayList<StackTraceElement[]>());
-       }
-
-       /** Tests that collecting for a cancelled sample throws no Exception. */
-       @Test
-       public void testCollectForDiscardedPendingSample() throws Exception {
-               ExecutionVertex[] vertices = new ExecutionVertex[] {
-                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
-               };
-
-               CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
-                               vertices, 1, Time.milliseconds(100L), 0);
-
-               assertFalse(sampleFuture.isDone());
-
-               coord.cancelStackTraceSample(0, null);
-
-               assertTrue(sampleFuture.isDone());
-
-               // Verify no error on late collect
-               ExecutionAttemptID executionId = 
vertices[0].getCurrentExecutionAttempt().getAttemptId();
-               coord.collectStackTraces(0, executionId, new 
ArrayList<StackTraceElement[]>());
-       }
-
-
-       /** Tests that collecting for a unknown task fails. */
-       @Test(expected = IllegalArgumentException.class)
-       public void testCollectStackTraceForUnknownTask() throws Exception {
-               ExecutionVertex[] vertices = new ExecutionVertex[] {
-                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
-               };
-
-               coord.triggerStackTraceSample(vertices, 1, 
Time.milliseconds(100L), 0);
-
-               coord.collectStackTraces(0, new ExecutionAttemptID(), new 
ArrayList<StackTraceElement[]>());
-       }
-
-       /** Tests that shut down fails all pending samples and future sample 
triggers. */
-       @Test
-       public void testShutDown() throws Exception {
-               ExecutionVertex[] vertices = new ExecutionVertex[] {
-                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
-               };
-
-               List<CompletableFuture<StackTraceSample>> sampleFutures = new 
ArrayList<>();
-
-               // Trigger
-               sampleFutures.add(coord.triggerStackTraceSample(
-                               vertices, 1, Time.milliseconds(100L), 0));
-
-               sampleFutures.add(coord.triggerStackTraceSample(
-                               vertices, 1, Time.milliseconds(100L), 0));
-
-               for (CompletableFuture<StackTraceSample> future : 
sampleFutures) {
-                       assertFalse(future.isDone());
-               }
-
-               // Shut down
-               coord.shutDown();
-
-               // Verify all completed
-               for (CompletableFuture<StackTraceSample> future : 
sampleFutures) {
-                       assertTrue(future.isDone());
-               }
-
-               // Verify new trigger returns failed future
-               CompletableFuture<StackTraceSample> future = 
coord.triggerStackTraceSample(
-                               vertices, 1, Time.milliseconds(100L), 0);
-
-               assertTrue(future.isDone());
-
-               try {
-                       future.get();
-                       fail("Expected exception.");
-               } catch (ExecutionException e) {
-                       // we expected an exception here :-)
-               }
-
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private ExecutionVertex mockExecutionVertex(
-                       ExecutionAttemptID executionId,
-                       ExecutionState state,
-                       boolean sendSuccess) {
-
-               Execution exec = mock(Execution.class);
-               CompletableFuture<StackTraceSampleResponse> failedFuture = new 
CompletableFuture<>();
-               failedFuture.completeExceptionally(new Exception("Send 
failed."));
-
-               when(exec.getAttemptId()).thenReturn(executionId);
-               when(exec.getState()).thenReturn(state);
-               when(exec.requestStackTraceSample(anyInt(), anyInt(), 
any(Time.class), anyInt(), any(Time.class)))
-                       .thenReturn(
-                               sendSuccess ?
-                                       
CompletableFuture.completedFuture(mock(StackTraceSampleResponse.class)) :
-                                       failedFuture);
-
-               ExecutionVertex vertex = mock(ExecutionVertex.class);
-               when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
-               when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
-
-               return vertex;
-       }
-
-       private ExecutionVertex mockExecutionVertexWithTimeout(
-               ExecutionAttemptID executionId,
-               ExecutionState state,
-               ScheduledExecutorService scheduledExecutorService,
-               int timeout) {
-
-               final CompletableFuture<StackTraceSampleResponse> future = new 
CompletableFuture<>();
-
-               Execution exec = mock(Execution.class);
-               when(exec.getAttemptId()).thenReturn(executionId);
-               when(exec.getState()).thenReturn(state);
-               when(exec.requestStackTraceSample(anyInt(), anyInt(), 
any(Time.class), anyInt(), any(Time.class)))
-                       .thenReturn(future);
-
-               scheduledExecutorService.schedule(new Runnable() {
-                       @Override
-                       public void run() {
-                               future.completeExceptionally(new 
TimeoutException("Timeout"));
-                       }
-               }, timeout, TimeUnit.MILLISECONDS);
-
-               ExecutionVertex vertex = mock(ExecutionVertex.class);
-               when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
-               when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
-
-               return vertex;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/files/MimeTypesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/files/MimeTypesTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/files/MimeTypesTest.java
deleted file mode 100644
index 0a8d9d8..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/files/MimeTypesTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.files;
-
-import org.apache.flink.runtime.rest.handler.util.MimeTypes;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the MIME types map.
- */
-public class MimeTypesTest {
-
-       @Test
-       public void testCompleteness() {
-               try {
-                       assertNotNull(MimeTypes.getMimeTypeForExtension("txt"));
-                       assertNotNull(MimeTypes.getMimeTypeForExtension("htm"));
-                       
assertNotNull(MimeTypes.getMimeTypeForExtension("html"));
-                       assertNotNull(MimeTypes.getMimeTypeForExtension("css"));
-                       assertNotNull(MimeTypes.getMimeTypeForExtension("js"));
-                       
assertNotNull(MimeTypes.getMimeTypeForExtension("json"));
-                       assertNotNull(MimeTypes.getMimeTypeForExtension("png"));
-                       assertNotNull(MimeTypes.getMimeTypeForExtension("jpg"));
-                       
assertNotNull(MimeTypes.getMimeTypeForExtension("jpeg"));
-                       assertNotNull(MimeTypes.getMimeTypeForExtension("gif"));
-                       
assertNotNull(MimeTypes.getMimeTypeForExtension("woff"));
-                       
assertNotNull(MimeTypes.getMimeTypeForExtension("woff2"));
-                       assertNotNull(MimeTypes.getMimeTypeForExtension("otf"));
-                       assertNotNull(MimeTypes.getMimeTypeForExtension("ttf"));
-                       assertNotNull(MimeTypes.getMimeTypeForExtension("eot"));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testFileNameExtraction() {
-               try {
-                       
assertNotNull(MimeTypes.getMimeTypeForFileName("test.txt"));
-                       
assertNotNull(MimeTypes.getMimeTypeForFileName("t.txt"));
-                       
assertNotNull(MimeTypes.getMimeTypeForFileName("first.second.third.txt"));
-
-                       assertNull(MimeTypes.getMimeTypeForFileName(".txt"));
-                       assertNull(MimeTypes.getMimeTypeForFileName("txt"));
-                       assertNull(MimeTypes.getMimeTypeForFileName("test."));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
deleted file mode 100644
index 69ee762..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.Executors;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for the ClusterOverviewHandler.
- */
-public class ClusterOverviewHandlerTest {
-       @Test
-       public void testGetPaths() {
-               ClusterOverviewHandler handler = new 
ClusterOverviewHandler(Executors.directExecutor(), Time.seconds(0L));
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               Assert.assertEquals("/overview", paths[0]);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
deleted file mode 100644
index 6061e4b..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.Executors;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for the CurrentJobIdsHandler.
- */
-public class CurrentJobIdsHandlerTest {
-       @Test
-       public void testGetPaths() {
-               CurrentJobIdsHandler handler = new 
CurrentJobIdsHandler(Executors.directExecutor(), Time.seconds(0L));
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               Assert.assertEquals("/jobs", paths[0]);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
deleted file mode 100644
index ccfafd4..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-
-/**
- * Tests for the CurrentJobsOverviewHandler.
- */
-public class CurrentJobsOverviewHandlerTest {
-
-       @Test
-       public void testArchiver() throws Exception {
-               JsonArchivist archivist = new 
CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist();
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-               JobDetails expectedDetails = 
WebMonitorUtils.createDetailsForJob(originalJob);
-
-               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
-               Assert.assertEquals(1, archives.size());
-
-               ArchivedJson archive = archives.iterator().next();
-               Assert.assertEquals("/joboverview", archive.getPath());
-
-               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(archive.getJson());
-               ArrayNode running = (ArrayNode) result.get("running");
-               Assert.assertEquals(0, running.size());
-
-               ArrayNode finished = (ArrayNode) result.get("finished");
-               Assert.assertEquals(1, finished.size());
-
-               compareJobOverview(expectedDetails, finished.get(0).toString());
-       }
-
-       @Test
-       public void testGetPaths() {
-               CurrentJobsOverviewHandler handlerAll = new 
CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, 
true);
-               String[] pathsAll = handlerAll.getPaths();
-               Assert.assertEquals(1, pathsAll.length);
-               Assert.assertEquals("/joboverview", pathsAll[0]);
-
-               CurrentJobsOverviewHandler handlerRunning = new 
CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, 
false);
-               String[] pathsRunning = handlerRunning.getPaths();
-               Assert.assertEquals(1, pathsRunning.length);
-               Assert.assertEquals("/joboverview/running", pathsRunning[0]);
-
-               CurrentJobsOverviewHandler handlerCompleted = new 
CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), false, 
true);
-               String[] pathsCompleted = handlerCompleted.getPaths();
-               Assert.assertEquals(1, pathsCompleted.length);
-               Assert.assertEquals("/joboverview/completed", 
pathsCompleted[0]);
-       }
-
-       @Test
-       public void testJsonGeneration() throws Exception {
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-               JobDetails expectedDetails = 
WebMonitorUtils.createDetailsForJob(originalJob);
-               StringWriter writer = new StringWriter();
-               try (JsonGenerator gen = 
ArchivedJobGenerationUtils.JACKSON_FACTORY.createGenerator(writer)) {
-                       
CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(expectedDetails, gen, 
0);
-               }
-               compareJobOverview(expectedDetails, writer.toString());
-       }
-
-       private static void compareJobOverview(JobDetails expectedDetails, 
String answer) throws IOException {
-               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(answer);
-
-               Assert.assertEquals(expectedDetails.getJobId().toString(), 
result.get("jid").asText());
-               Assert.assertEquals(expectedDetails.getJobName(), 
result.get("name").asText());
-               Assert.assertEquals(expectedDetails.getStatus().name(), 
result.get("state").asText());
-
-               Assert.assertEquals(expectedDetails.getStartTime(), 
result.get("start-time").asLong());
-               Assert.assertEquals(expectedDetails.getEndTime(), 
result.get("end-time").asLong());
-               Assert.assertEquals(expectedDetails.getEndTime() - 
expectedDetails.getStartTime(), result.get("duration").asLong());
-               Assert.assertEquals(expectedDetails.getLastUpdateTime(), 
result.get("last-modification").asLong());
-
-               JsonNode tasks = result.get("tasks");
-               Assert.assertEquals(expectedDetails.getNumTasks(), 
tasks.get("total").asInt());
-               int[] tasksPerState = 
expectedDetails.getNumVerticesPerExecutionState();
-               Assert.assertEquals(
-                       tasksPerState[ExecutionState.CREATED.ordinal()] + 
tasksPerState[ExecutionState.SCHEDULED.ordinal()] + 
tasksPerState[ExecutionState.DEPLOYING.ordinal()],
-                       tasks.get("pending").asInt());
-               
Assert.assertEquals(tasksPerState[ExecutionState.RUNNING.ordinal()], 
tasks.get("running").asInt());
-               
Assert.assertEquals(tasksPerState[ExecutionState.FINISHED.ordinal()], 
tasks.get("finished").asInt());
-               
Assert.assertEquals(tasksPerState[ExecutionState.CANCELING.ordinal()], 
tasks.get("canceling").asInt());
-               
Assert.assertEquals(tasksPerState[ExecutionState.CANCELED.ordinal()], 
tasks.get("canceled").asInt());
-               
Assert.assertEquals(tasksPerState[ExecutionState.FAILED.ordinal()], 
tasks.get("failed").asInt());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
deleted file mode 100644
index 22b3e5e..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.TimeZone;
-
-/**
- * Tests for the DashboardConfigHandler.
- */
-public class DashboardConfigHandlerTest {
-       @Test
-       public void testGetPaths() {
-               DashboardConfigHandler handler = new 
DashboardConfigHandler(Executors.directExecutor(), 10000L);
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               Assert.assertEquals("/config", paths[0]);
-       }
-
-       @Test
-       public void testJsonGeneration() throws Exception {
-               long refreshInterval = 12345;
-               TimeZone timeZone = TimeZone.getDefault();
-               EnvironmentInformation.RevisionInformation revision = 
EnvironmentInformation.getRevisionInformation();
-
-               String json = 
DashboardConfigHandler.createConfigJson(refreshInterval);
-
-               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-               Assert.assertEquals(refreshInterval, 
result.get("refresh-interval").asLong());
-               Assert.assertEquals(timeZone.getDisplayName(), 
result.get("timezone-name").asText());
-               Assert.assertEquals(timeZone.getRawOffset(), 
result.get("timezone-offset").asLong());
-               Assert.assertEquals(EnvironmentInformation.getVersion(), 
result.get("flink-version").asText());
-               Assert.assertEquals(revision.commitId + " @ " + 
revision.commitDate, result.get("flink-revision").asText());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
deleted file mode 100644
index e79be96..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the HandlerRedirectUtils.
- */
-public class HandlerRedirectUtilsTest extends TestLogger {
-
-       private static final String localRestAddress = "http://127.0.0.1:1234";;
-       private static final String remoteRestAddress = "http://127.0.0.2:1234";;
-
-       @Test
-       public void testGetRedirectAddressWithLocalEqualsRemoteRESTAddress() 
throws Exception {
-               JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
-               
when(jobManagerGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(localRestAddress));
-
-               CompletableFuture<Optional<String>> redirectingAddressFuture = 
HandlerRedirectUtils.getRedirectAddress(
-                       localRestAddress,
-                       jobManagerGateway,
-                       Time.seconds(3L));
-
-               Assert.assertTrue(redirectingAddressFuture.isDone());
-               // no redirection needed
-               Assert.assertFalse(redirectingAddressFuture.get().isPresent());
-       }
-
-       @Test
-       public void testGetRedirectAddressWithRemoteAkkaPath() throws Exception 
{
-               JobManagerGateway jobManagerGateway = 
mock(AkkaJobManagerGateway.class);
-               
when(jobManagerGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(remoteRestAddress));
-
-               CompletableFuture<Optional<String>> optRedirectingAddress = 
HandlerRedirectUtils.getRedirectAddress(
-                       localRestAddress,
-                       jobManagerGateway,
-                       Time.seconds(3L));
-
-               Assert.assertTrue(optRedirectingAddress.isDone());
-
-               Assert.assertEquals(remoteRestAddress, 
optRedirectingAddress.get().get());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandlerTest.java
index 647e782..eb0f6b3 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandlerTest.java
@@ -19,15 +19,14 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import 
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.JarActionHandlerConfig;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 
 /**
  * Tests for the JarActionHandler.
@@ -49,7 +48,7 @@ public class JarActionHandlerTest {
                Map<String, String> queryParams = new HashMap<>(); // <-- 
everything goes here
 
                // Nothing configured
-               JarActionHandlerConfig config = 
JarActionHandlerConfig.fromParams(pathParams, queryParams);
+               JarActionHandler.JarActionHandlerConfig config = 
JarActionHandler.JarActionHandlerConfig.fromParams(pathParams, queryParams);
                assertEquals(SavepointRestoreSettings.none(), 
config.getSavepointRestoreSettings());
 
                // Set path
@@ -58,14 +57,14 @@ public class JarActionHandlerTest {
 
                SavepointRestoreSettings expected = 
SavepointRestoreSettings.forPath("the-savepoint-path", false);
 
-               config = JarActionHandlerConfig.fromParams(pathParams, 
queryParams);
+               config = 
JarActionHandler.JarActionHandlerConfig.fromParams(pathParams, queryParams);
                assertEquals(expected, config.getSavepointRestoreSettings());
 
                // Set flag
                queryParams.put("allowNonRestoredState", "true");
 
                expected = 
SavepointRestoreSettings.forPath("the-savepoint-path", true);
-               config = JarActionHandlerConfig.fromParams(pathParams, 
queryParams);
+               config = 
JarActionHandler.JarActionHandlerConfig.fromParams(pathParams, queryParams);
                assertEquals(expected, config.getSavepointRestoreSettings());
        }
 
@@ -85,10 +84,10 @@ public class JarActionHandlerTest {
                queryParams.put("allowNonRestoredState", "");
 
                // Nothing configured
-               JarActionHandlerConfig config = 
JarActionHandlerConfig.fromParams(pathParams, queryParams);
+               JarActionHandler.JarActionHandlerConfig config = 
JarActionHandler.JarActionHandlerConfig.fromParams(pathParams, queryParams);
 
                assertEquals(0, config.getProgramArgs().length);
-               assertNull(config.getEntryClass());
+               Assert.assertNull(config.getEntryClass());
                assertEquals(1, config.getParallelism());
                assertEquals(SavepointRestoreSettings.none(), 
config.getSavepointRestoreSettings());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
deleted file mode 100644
index 5510fed..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the JobAccumulatorsHandler.
- */
-public class JobAccumulatorsHandlerTest {
-
-       @Test
-       public void testArchiver() throws Exception {
-               JsonArchivist archivist = new 
JobAccumulatorsHandler.JobAccumulatorsJsonArchivist();
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-
-               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
-               Assert.assertEquals(1, archives.size());
-
-               ArchivedJson archive = archives.iterator().next();
-               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/accumulators", archive.getPath());
-               compareAccumulators(originalJob, archive.getJson());
-       }
-
-       @Test
-       public void testGetPaths() {
-               JobAccumulatorsHandler handler = new 
JobAccumulatorsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]);
-       }
-
-       @Test
-       public void testJsonGeneration() throws Exception {
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-               String json = 
JobAccumulatorsHandler.createJobAccumulatorsJson(originalJob);
-
-               compareAccumulators(originalJob, json);
-       }
-
-       private static void compareAccumulators(AccessExecutionGraph 
originalJob, String json) throws IOException {
-               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-               ArrayNode accs = (ArrayNode) result.get("job-accumulators");
-               Assert.assertEquals(0, accs.size());
-
-               
Assert.assertTrue(originalJob.getAccumulatorResultsStringified().length > 0);
-               ArchivedJobGenerationUtils.compareStringifiedAccumulators(
-                       originalJob.getAccumulatorResultsStringified(),
-                       (ArrayNode) result.get("user-task-accumulators"));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
deleted file mode 100644
index 86c5295..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.List;
-
-/**
- * Tests for the JobCancellationHandler.
- */
-public class JobCancellationHandlerTest {
-       @Test
-       public void testGetPaths() {
-               JobCancellationHandler handler = new 
JobCancellationHandler(Executors.directExecutor(), TestingUtils.TIMEOUT());
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(2, paths.length);
-               List<String> pathsList = Lists.newArrayList(paths);
-               Assert.assertTrue(pathsList.contains("/jobs/:jobid/cancel"));
-               
Assert.assertTrue(pathsList.contains("/jobs/:jobid/yarn-cancel"));
-       }
-}

Reply via email to