http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java deleted file mode 100644 index fde16fc..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.metrics; - -import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler; - -import java.util.Map; -import java.util.concurrent.Executor; - -/** - * Request handler that returns for a given task manager a list of all available metrics or the values for a set of metrics. - * - * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned. - * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } - * - * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. - * {@code /get?X,Y} - * The handler will then return a list containing the values of the requested metrics. - * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } - */ -public class TaskManagerMetricsHandler extends AbstractMetricsHandler { - - private static final String TASKMANAGER_METRICS_REST_PATH = "/taskmanagers/:taskmanagerid/metrics"; - - public TaskManagerMetricsHandler(Executor executor, MetricFetcher fetcher) { - super(executor, fetcher); - } - - @Override - public String[] getPaths() { - return new String[]{TASKMANAGER_METRICS_REST_PATH}; - } - - @Override - protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { - MetricStore.TaskManagerMetricStore taskManager = metrics.getTaskManagerMetricStore(pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY)); - if (taskManager == null) { - return null; - } else { - return taskManager.metrics; - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java deleted file mode 100644 index 9d71786..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.utils; - -import org.apache.flink.runtime.executiongraph.AccessExecution; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.IOMetrics; -import org.apache.flink.runtime.metrics.MetricNames; -import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler; -import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; -import org.apache.flink.runtime.webmonitor.metrics.MetricStore; - -import com.fasterxml.jackson.core.JsonGenerator; - -import javax.annotation.Nullable; - -import java.io.IOException; - -/** - * This class is a mutable version of the {@link IOMetrics} class that allows adding up IO-related metrics. - * - * <p>For finished jobs these metrics are stored in the {@link ExecutionGraph} as another {@link IOMetrics}. - * For running jobs these metrics are retrieved using the {@link MetricFetcher}. - * - * <p>This class provides a common interface to handle both cases, reducing complexity in various handlers (like - * the {@link JobVertexDetailsHandler}). - */ -public class MutableIOMetrics extends IOMetrics { - - private static final long serialVersionUID = -5460777634971381737L; - - public MutableIOMetrics() { - super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D); - } - - /** - * Adds the IO metrics for the given attempt to this object. If the {@link AccessExecution} is in - * a terminal state the contained {@link IOMetrics} object is added. Otherwise the given {@link MetricFetcher} is - * used to retrieve the required metrics. - * - * @param attempt Attempt whose IO metrics should be added - * @param fetcher MetricFetcher to retrieve metrics for running jobs - * @param jobID JobID to which the attempt belongs - * @param taskID TaskID to which the attempt belongs - */ - public void addIOMetrics(AccessExecution attempt, @Nullable MetricFetcher fetcher, String jobID, String taskID) { - if (attempt.getState().isTerminal()) { - IOMetrics ioMetrics = attempt.getIOMetrics(); - if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph - this.numBytesInLocal += ioMetrics.getNumBytesInLocal(); - this.numBytesInRemote += ioMetrics.getNumBytesInRemote(); - this.numBytesOut += ioMetrics.getNumBytesOut(); - this.numRecordsIn += ioMetrics.getNumRecordsIn(); - this.numRecordsOut += ioMetrics.getNumRecordsOut(); - } - } else { // execAttempt is still running, use MetricQueryService instead - if (fetcher != null) { - fetcher.update(); - MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex()); - if (metrics != null) { - this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")); - this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0")); - this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0")); - this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0")); - this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0")); - } - } - } - } - - /** - * Writes the IO metrics contained in this object to the given {@link JsonGenerator}. - * - * <p>The JSON structure written is as follows: - * "metrics": { - * "read-bytes": 1, - * "write-bytes": 2, - * "read-records": 3, - * "write-records": 4 - * } - * - * @param gen JsonGenerator to which the metrics should be written - * @throws IOException - */ - public void writeIOMetricsAsJson(JsonGenerator gen) throws IOException { - gen.writeObjectFieldStart("metrics"); - gen.writeNumberField("read-bytes", this.numBytesInLocal + this.numBytesInRemote); - gen.writeNumberField("write-bytes", this.numBytesOut); - gen.writeNumberField("read-records", this.numRecordsIn); - gen.writeNumberField("write-records", this.numRecordsOut); - gen.writeEndObject(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java deleted file mode 100644 index 0e4734d..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java +++ /dev/null @@ -1,332 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.akka.AkkaJobManagerGateway; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.client.JobClient; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferPool; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.util.TestLogger; - -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import scala.Option; -import scala.concurrent.duration.FiniteDuration; - -import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning; -import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound; -import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph; -import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Simple back pressured task test. - */ -public class BackPressureStatsTrackerITCase extends TestLogger { - - private static NetworkBufferPool networkBufferPool; - private static ActorSystem testActorSystem; - - /** Shared as static variable with the test task. */ - private static BufferPool testBufferPool; - - @BeforeClass - public static void setup() { - testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); - networkBufferPool = new NetworkBufferPool(100, 8192, MemoryType.HEAP); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(testActorSystem); - networkBufferPool.destroy(); - } - - /** - * Tests a simple fake-back pressured task. Back pressure is assumed when - * sampled stack traces are in blocking buffer requests. - */ - @Test - public void testBackPressuredProducer() throws Exception { - new JavaTestKit(testActorSystem) {{ - final FiniteDuration deadline = new FiniteDuration(60, TimeUnit.SECONDS); - - // The JobGraph - final JobGraph jobGraph = new JobGraph(); - final int parallelism = 4; - - final JobVertex task = new JobVertex("Task"); - task.setInvokableClass(BackPressuredTask.class); - task.setParallelism(parallelism); - - jobGraph.addVertex(task); - - final Configuration config = new Configuration(); - - final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( - config, - TestingUtils.defaultExecutor()); - - ActorGateway jobManger = null; - ActorGateway taskManager = null; - - // - // 1) Consume all buffers at first (no buffers for the test task) - // - testBufferPool = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE); - final List<Buffer> buffers = new ArrayList<>(); - while (true) { - Buffer buffer = testBufferPool.requestBuffer(); - if (buffer != null) { - buffers.add(buffer); - } else { - break; - } - } - - try { - jobManger = TestingUtils.createJobManager( - testActorSystem, - TestingUtils.defaultExecutor(), - TestingUtils.defaultExecutor(), - config, - highAvailabilityServices); - - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism); - - taskManager = TestingUtils.createTaskManager( - testActorSystem, - highAvailabilityServices, - config, - true, - true); - - final ActorGateway jm = jobManger; - - new Within(deadline) { - @Override - protected void run() { - try { - ActorGateway testActor = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID); - - // Submit the job and wait until it is running - JobClient.submitJobDetached( - new AkkaJobManagerGateway(jm), - config, - jobGraph, - Time.milliseconds(deadline.toMillis()), - ClassLoader.getSystemClassLoader()); - - jm.tell(new WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor); - - expectMsgEquals(new AllVerticesRunning(jobGraph.getJobID())); - - // Get the ExecutionGraph - jm.tell(new RequestExecutionGraph(jobGraph.getJobID()), testActor); - - ExecutionGraphFound executionGraphResponse = - expectMsgClass(ExecutionGraphFound.class); - - ExecutionGraph executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph(); - ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID()); - - StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator( - testActorSystem.dispatcher(), 60000); - - // Verify back pressure (clean up interval can be ignored) - BackPressureStatsTracker statsTracker = new BackPressureStatsTracker( - coordinator, - 100 * 1000, - 20, - Time.milliseconds(10L)); - - int numAttempts = 10; - - int nextSampleId = 0; - - // Verify that all tasks are back pressured. This - // can fail if the task takes longer to request - // the buffer. - for (int attempt = 0; attempt < numAttempts; attempt++) { - try { - OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex); - - assertEquals(nextSampleId + attempt, stats.getSampleId()); - assertEquals(parallelism, stats.getNumberOfSubTasks()); - assertEquals(1.0, stats.getMaxBackPressureRatio(), 0.0); - - for (int i = 0; i < parallelism; i++) { - assertEquals(1.0, stats.getBackPressureRatio(i), 0.0); - } - - nextSampleId = stats.getSampleId() + 1; - - break; - } catch (Throwable t) { - if (attempt == numAttempts - 1) { - throw t; - } else { - Thread.sleep(500); - } - } - } - - // - // 2) Release all buffers and let the tasks grab one - // - for (Buffer buf : buffers) { - buf.recycle(); - } - - // Wait for all buffers to be available. The tasks - // grab them and then immediately release them. - while (testBufferPool.getNumberOfAvailableMemorySegments() < 100) { - Thread.sleep(100); - } - - // Verify that no task is back pressured any more. - for (int attempt = 0; attempt < numAttempts; attempt++) { - try { - OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex); - - assertEquals(nextSampleId + attempt, stats.getSampleId()); - assertEquals(parallelism, stats.getNumberOfSubTasks()); - - // Verify that no task is back pressured - for (int i = 0; i < parallelism; i++) { - assertEquals(0.0, stats.getBackPressureRatio(i), 0.0); - } - - break; - } catch (Throwable t) { - if (attempt == numAttempts - 1) { - throw t; - } else { - Thread.sleep(500); - } - } - } - - // Shut down - jm.tell(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), testActor); - - // Cancel job - jm.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID())); - - // Response to removal notification - expectMsgEquals(true); - - // - // 3) Trigger stats for archived job - // - statsTracker.invalidateOperatorStatsCache(); - assertFalse("Unexpected trigger", statsTracker.triggerStackTraceSample(vertex)); - - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - }; - } finally { - TestingUtils.stopActor(jobManger); - TestingUtils.stopActor(taskManager); - - highAvailabilityServices.closeAndCleanupAllData(); - - for (Buffer buf : buffers) { - buf.recycle(); - } - - testBufferPool.lazyDestroy(); - } - }}; - } - - /** - * Triggers a new stats sample. - */ - private OperatorBackPressureStats triggerStatsSample( - BackPressureStatsTracker statsTracker, - ExecutionJobVertex vertex) throws InterruptedException { - - statsTracker.invalidateOperatorStatsCache(); - assertTrue("Failed to trigger", statsTracker.triggerStackTraceSample(vertex)); - - // Sleep minimum duration - Thread.sleep(20 * 10); - - Option<OperatorBackPressureStats> stats; - - // Get the stats - while ((stats = statsTracker.getOperatorBackPressureStats(vertex)).isEmpty()) { - Thread.sleep(10); - } - - return stats.get(); - } - - /** - * A back pressured producer sharing a {@link BufferPool} with the - * test driver. - */ - public static class BackPressuredTask extends AbstractInvokable { - - @Override - public void invoke() throws Exception { - while (true) { - Buffer buffer = testBufferPool.requestBufferBlocking(); - // Got a buffer, yay! - buffer.recycle(); - - new CountDownLatch(1).await(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java deleted file mode 100644 index e99d1b7..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.executiongraph.Execution; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -/** - * Tests for the BackPressureStatsTracker. - */ -public class BackPressureStatsTrackerTest extends TestLogger { - - /** Tests simple statistics with fake stack traces. */ - @Test - @SuppressWarnings("unchecked") - public void testTriggerStackTraceSample() throws Exception { - CompletableFuture<StackTraceSample> sampleFuture = new CompletableFuture<>(); - - StackTraceSampleCoordinator sampleCoordinator = mock(StackTraceSampleCoordinator.class); - when(sampleCoordinator.triggerStackTraceSample( - any(ExecutionVertex[].class), - anyInt(), - any(Time.class), - anyInt())).thenReturn(sampleFuture); - - ExecutionGraph graph = mock(ExecutionGraph.class); - when(graph.getState()).thenReturn(JobStatus.RUNNING); - - // Same Thread execution context - when(graph.getFutureExecutor()).thenReturn(new Executor() { - - @Override - public void execute(Runnable runnable) { - runnable.run(); - } - }); - - ExecutionVertex[] taskVertices = new ExecutionVertex[4]; - - ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class); - when(jobVertex.getJobId()).thenReturn(new JobID()); - when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID()); - when(jobVertex.getGraph()).thenReturn(graph); - when(jobVertex.getTaskVertices()).thenReturn(taskVertices); - - taskVertices[0] = mockExecutionVertex(jobVertex, 0); - taskVertices[1] = mockExecutionVertex(jobVertex, 1); - taskVertices[2] = mockExecutionVertex(jobVertex, 2); - taskVertices[3] = mockExecutionVertex(jobVertex, 3); - - int numSamples = 100; - Time delayBetweenSamples = Time.milliseconds(100L); - - BackPressureStatsTracker tracker = new BackPressureStatsTracker( - sampleCoordinator, 9999, numSamples, delayBetweenSamples); - - // Trigger - assertTrue("Failed to trigger", tracker.triggerStackTraceSample(jobVertex)); - - verify(sampleCoordinator).triggerStackTraceSample( - eq(taskVertices), - eq(numSamples), - eq(delayBetweenSamples), - eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH)); - - // Trigger again for pending request, should not fire - assertFalse("Unexpected trigger", tracker.triggerStackTraceSample(jobVertex)); - - assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isEmpty()); - - verify(sampleCoordinator).triggerStackTraceSample( - eq(taskVertices), - eq(numSamples), - eq(delayBetweenSamples), - eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH)); - - assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isEmpty()); - - // Complete the future - Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = new HashMap<>(); - for (ExecutionVertex vertex : taskVertices) { - List<StackTraceElement[]> taskTraces = new ArrayList<>(); - - for (int i = 0; i < taskVertices.length; i++) { - // Traces until sub task index are back pressured - taskTraces.add(createStackTrace(i <= vertex.getParallelSubtaskIndex())); - } - - traces.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskTraces); - } - - int sampleId = 1231; - int endTime = 841; - - StackTraceSample sample = new StackTraceSample( - sampleId, - 0, - endTime, - traces); - - // Succeed the promise - sampleFuture.complete(sample); - - assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isDefined()); - - OperatorBackPressureStats stats = tracker.getOperatorBackPressureStats(jobVertex).get(); - - // Verify the stats - assertEquals(sampleId, stats.getSampleId()); - assertEquals(endTime, stats.getEndTimestamp()); - assertEquals(taskVertices.length, stats.getNumberOfSubTasks()); - - for (int i = 0; i < taskVertices.length; i++) { - double ratio = stats.getBackPressureRatio(i); - // Traces until sub task index are back pressured - assertEquals((i + 1) / ((double) 4), ratio, 0.0); - } - } - - private StackTraceElement[] createStackTrace(boolean isBackPressure) { - if (isBackPressure) { - return new StackTraceElement[] { new StackTraceElement( - BackPressureStatsTracker.EXPECTED_CLASS_NAME, - BackPressureStatsTracker.EXPECTED_METHOD_NAME, - "LocalBufferPool.java", - 133) }; - } else { - return Thread.currentThread().getStackTrace(); - } - } - - private ExecutionVertex mockExecutionVertex( - ExecutionJobVertex jobVertex, - int subTaskIndex) { - - Execution exec = mock(Execution.class); - when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID()); - - JobVertexID id = jobVertex.getJobVertexId(); - - ExecutionVertex vertex = mock(ExecutionVertex.class); - when(vertex.getJobvertexId()).thenReturn(id); - when(vertex.getCurrentExecutionAttempt()).thenReturn(exec); - when(vertex.getParallelSubtaskIndex()).thenReturn(subTaskIndex); - - return vertex; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java deleted file mode 100644 index bd12668..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaJobManagerGateway; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.client.JobClient; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; -import org.apache.flink.util.TestLogger; - -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning; -import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound; -import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph; -import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning; -import static org.junit.Assert.fail; - -/** - * Simple stack trace sampling test. - */ -public class StackTraceSampleCoordinatorITCase extends TestLogger { - - private static ActorSystem testActorSystem; - - @BeforeClass - public static void setup() { - testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(testActorSystem); - } - - /** - * Tests that a cleared task is answered with a partial success response. - */ - @Test - public void testTaskClearedWhileSampling() throws Exception { - new JavaTestKit(testActorSystem) {{ - final FiniteDuration deadline = new FiniteDuration(60, TimeUnit.SECONDS); - - // The JobGraph - final JobGraph jobGraph = new JobGraph(); - final int parallelism = 1; - - final JobVertex task = new JobVertex("Task"); - task.setInvokableClass(BlockingNoOpInvokable.class); - task.setParallelism(parallelism); - - jobGraph.addVertex(task); - - final Configuration config = new Configuration(); - - final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( - config, - TestingUtils.defaultExecutor()); - - ActorGateway jobManger = null; - ActorGateway taskManager = null; - - try { - jobManger = TestingUtils.createJobManager( - testActorSystem, - TestingUtils.defaultExecutor(), - TestingUtils.defaultExecutor(), - config, - highAvailabilityServices); - - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism); - - taskManager = TestingUtils.createTaskManager( - testActorSystem, - highAvailabilityServices, - config, - true, - true); - - final ActorGateway jm = jobManger; - - new Within(deadline) { - @Override - protected void run() { - try { - ActorGateway testActor = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID); - - int maxAttempts = 10; - int sleepTime = 100; - - for (int i = 0; i < maxAttempts; i++, sleepTime *= 2) { - // Submit the job and wait until it is running - JobClient.submitJobDetached( - new AkkaJobManagerGateway(jm), - config, - jobGraph, - Time.milliseconds(deadline.toMillis()), - ClassLoader.getSystemClassLoader()); - - jm.tell(new WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor); - - expectMsgEquals(new AllVerticesRunning(jobGraph.getJobID())); - - // Get the ExecutionGraph - jm.tell(new RequestExecutionGraph(jobGraph.getJobID()), testActor); - ExecutionGraphFound executionGraphResponse = - expectMsgClass(ExecutionGraphFound.class); - ExecutionGraph executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph(); - ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID()); - - StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator( - testActorSystem.dispatcher(), 60000); - - CompletableFuture<StackTraceSample> sampleFuture = coordinator.triggerStackTraceSample( - vertex.getTaskVertices(), - // Do this often so we have a good - // chance of removing the job during - // sampling. - 21474700 * 100, - Time.milliseconds(10L), - 0); - - // Wait before cancelling so that some samples - // are actually taken. - Thread.sleep(sleepTime); - - // Cancel job - Future<?> removeFuture = jm.ask( - new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), - remaining()); - - jm.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID())); - - try { - // Throws Exception on failure - sampleFuture.get(remaining().toMillis(), TimeUnit.MILLISECONDS); - - // OK, we are done. Got the expected - // partial result. - break; - } catch (Throwable t) { - // We were too fast in cancelling the job. - // Fall through and retry. - } finally { - Await.ready(removeFuture, remaining()); - } - } - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - }; - } finally { - TestingUtils.stopActor(jobManger); - TestingUtils.stopActor(taskManager); - - highAvailabilityServices.closeAndCleanupAllData(); - } - }}; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java deleted file mode 100644 index 08c4212..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java +++ /dev/null @@ -1,441 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.Execution; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample; -import org.apache.flink.runtime.messages.StackTraceSampleResponse; -import org.apache.flink.util.TestLogger; - -import akka.actor.ActorSystem; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -/** - * Test for the {@link StackTraceSampleCoordinator}. - */ -public class StackTraceSampleCoordinatorTest extends TestLogger { - - private static ActorSystem system; - - private StackTraceSampleCoordinator coord; - - @BeforeClass - public static void setUp() throws Exception { - system = AkkaUtils.createLocalActorSystem(new Configuration()); - } - - @AfterClass - public static void tearDown() throws Exception { - if (system != null) { - system.shutdown(); - } - } - - @Before - public void init() throws Exception { - this.coord = new StackTraceSampleCoordinator(system.dispatcher(), 60000); - } - - /** Tests simple trigger and collect of stack trace samples. */ - @Test - public void testTriggerStackTraceSample() throws Exception { - ExecutionVertex[] vertices = new ExecutionVertex[] { - mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), - mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), - mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), - mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true) - }; - - int numSamples = 1; - Time delayBetweenSamples = Time.milliseconds(100L); - int maxStackTraceDepth = 0; - - CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( - vertices, numSamples, delayBetweenSamples, maxStackTraceDepth); - - // Verify messages have been sent - for (ExecutionVertex vertex : vertices) { - ExecutionAttemptID expectedExecutionId = vertex - .getCurrentExecutionAttempt().getAttemptId(); - - TriggerStackTraceSample expectedMsg = new TriggerStackTraceSample( - 0, - expectedExecutionId, - numSamples, - delayBetweenSamples, - maxStackTraceDepth); - - verify(vertex.getCurrentExecutionAttempt()) - .requestStackTraceSample(eq(0), eq(numSamples), eq(delayBetweenSamples), eq(maxStackTraceDepth), any(Time.class)); - } - - assertFalse(sampleFuture.isDone()); - - StackTraceElement[] stackTraceSample = Thread.currentThread().getStackTrace(); - List<StackTraceElement[]> traces = new ArrayList<>(); - traces.add(stackTraceSample); - traces.add(stackTraceSample); - traces.add(stackTraceSample); - - // Collect stack traces - for (int i = 0; i < vertices.length; i++) { - ExecutionAttemptID executionId = vertices[i].getCurrentExecutionAttempt().getAttemptId(); - coord.collectStackTraces(0, executionId, traces); - - if (i == vertices.length - 1) { - assertTrue(sampleFuture.isDone()); - } else { - assertFalse(sampleFuture.isDone()); - } - } - - // Verify completed stack trace sample - StackTraceSample sample = sampleFuture.get(); - - assertEquals(0, sample.getSampleId()); - assertTrue(sample.getEndTime() >= sample.getStartTime()); - - Map<ExecutionAttemptID, List<StackTraceElement[]>> tracesByTask = sample.getStackTraces(); - - for (ExecutionVertex vertex : vertices) { - ExecutionAttemptID executionId = vertex.getCurrentExecutionAttempt().getAttemptId(); - List<StackTraceElement[]> sampleTraces = tracesByTask.get(executionId); - - assertNotNull("Task not found", sampleTraces); - assertTrue(traces.equals(sampleTraces)); - } - - // Verify no more pending sample - assertEquals(0, coord.getNumberOfPendingSamples()); - - // Verify no error on late collect - coord.collectStackTraces(0, vertices[0].getCurrentExecutionAttempt().getAttemptId(), traces); - } - - /** Tests triggering for non-running tasks fails the future. */ - @Test - public void testTriggerStackTraceSampleNotRunningTasks() throws Exception { - ExecutionVertex[] vertices = new ExecutionVertex[] { - mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), - mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.DEPLOYING, true) - }; - - CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( - vertices, - 1, - Time.milliseconds(100L), - 0); - - assertTrue(sampleFuture.isDone()); - - try { - sampleFuture.get(); - fail("Expected exception."); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof IllegalStateException); - } - } - - /** Tests triggering for reset tasks fails the future. */ - @Test(timeout = 1000L) - public void testTriggerStackTraceSampleResetRunningTasks() throws Exception { - ExecutionVertex[] vertices = new ExecutionVertex[] { - mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), - // Fails to send the message to the execution (happens when execution is reset) - mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, false) - }; - - CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( - vertices, - 1, - Time.milliseconds(100L), - 0); - - try { - sampleFuture.get(); - fail("Expected exception."); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof RuntimeException); - } - } - - /** Tests that samples time out if they don't finish in time. */ - @Test(timeout = 1000L) - public void testTriggerStackTraceSampleTimeout() throws Exception { - int timeout = 100; - - coord = new StackTraceSampleCoordinator(system.dispatcher(), timeout); - - final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1); - - try { - - ExecutionVertex[] vertices = new ExecutionVertex[]{ - mockExecutionVertexWithTimeout( - new ExecutionAttemptID(), - ExecutionState.RUNNING, - scheduledExecutorService, - timeout) - }; - - CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( - vertices, 1, Time.milliseconds(100L), 0); - - // Wait for the timeout - Thread.sleep(timeout * 2); - - boolean success = false; - for (int i = 0; i < 10; i++) { - if (sampleFuture.isDone()) { - success = true; - break; - } - - Thread.sleep(timeout); - } - - assertTrue("Sample did not time out", success); - - try { - sampleFuture.get(); - fail("Expected exception."); - } catch (ExecutionException e) { - assertTrue(e.getCause().getCause().getMessage().contains("Timeout")); - } - - // Collect after the timeout (should be ignored) - ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId(); - coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>()); - } finally { - scheduledExecutorService.shutdownNow(); - } - } - - /** Tests that collecting an unknown sample is ignored. */ - @Test - public void testCollectStackTraceForUnknownSample() throws Exception { - coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList<StackTraceElement[]>()); - } - - /** Tests cancelling of a pending sample. */ - @Test - public void testCancelStackTraceSample() throws Exception { - ExecutionVertex[] vertices = new ExecutionVertex[] { - mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), - }; - - CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( - vertices, 1, Time.milliseconds(100L), 0); - - assertFalse(sampleFuture.isDone()); - - // Cancel - coord.cancelStackTraceSample(0, null); - - // Verify completed - assertTrue(sampleFuture.isDone()); - - // Verify no more pending samples - assertEquals(0, coord.getNumberOfPendingSamples()); - } - - /** Tests that collecting for a cancelled sample throws no Exception. */ - @Test - public void testCollectStackTraceForCanceledSample() throws Exception { - ExecutionVertex[] vertices = new ExecutionVertex[] { - mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), - }; - - CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( - vertices, 1, Time.milliseconds(100L), 0); - - assertFalse(sampleFuture.isDone()); - - coord.cancelStackTraceSample(0, null); - - assertTrue(sampleFuture.isDone()); - - // Verify no error on late collect - ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId(); - coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>()); - } - - /** Tests that collecting for a cancelled sample throws no Exception. */ - @Test - public void testCollectForDiscardedPendingSample() throws Exception { - ExecutionVertex[] vertices = new ExecutionVertex[] { - mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), - }; - - CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( - vertices, 1, Time.milliseconds(100L), 0); - - assertFalse(sampleFuture.isDone()); - - coord.cancelStackTraceSample(0, null); - - assertTrue(sampleFuture.isDone()); - - // Verify no error on late collect - ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId(); - coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>()); - } - - - /** Tests that collecting for a unknown task fails. */ - @Test(expected = IllegalArgumentException.class) - public void testCollectStackTraceForUnknownTask() throws Exception { - ExecutionVertex[] vertices = new ExecutionVertex[] { - mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), - }; - - coord.triggerStackTraceSample(vertices, 1, Time.milliseconds(100L), 0); - - coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList<StackTraceElement[]>()); - } - - /** Tests that shut down fails all pending samples and future sample triggers. */ - @Test - public void testShutDown() throws Exception { - ExecutionVertex[] vertices = new ExecutionVertex[] { - mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), - }; - - List<CompletableFuture<StackTraceSample>> sampleFutures = new ArrayList<>(); - - // Trigger - sampleFutures.add(coord.triggerStackTraceSample( - vertices, 1, Time.milliseconds(100L), 0)); - - sampleFutures.add(coord.triggerStackTraceSample( - vertices, 1, Time.milliseconds(100L), 0)); - - for (CompletableFuture<StackTraceSample> future : sampleFutures) { - assertFalse(future.isDone()); - } - - // Shut down - coord.shutDown(); - - // Verify all completed - for (CompletableFuture<StackTraceSample> future : sampleFutures) { - assertTrue(future.isDone()); - } - - // Verify new trigger returns failed future - CompletableFuture<StackTraceSample> future = coord.triggerStackTraceSample( - vertices, 1, Time.milliseconds(100L), 0); - - assertTrue(future.isDone()); - - try { - future.get(); - fail("Expected exception."); - } catch (ExecutionException e) { - // we expected an exception here :-) - } - - } - - // ------------------------------------------------------------------------ - - private ExecutionVertex mockExecutionVertex( - ExecutionAttemptID executionId, - ExecutionState state, - boolean sendSuccess) { - - Execution exec = mock(Execution.class); - CompletableFuture<StackTraceSampleResponse> failedFuture = new CompletableFuture<>(); - failedFuture.completeExceptionally(new Exception("Send failed.")); - - when(exec.getAttemptId()).thenReturn(executionId); - when(exec.getState()).thenReturn(state); - when(exec.requestStackTraceSample(anyInt(), anyInt(), any(Time.class), anyInt(), any(Time.class))) - .thenReturn( - sendSuccess ? - CompletableFuture.completedFuture(mock(StackTraceSampleResponse.class)) : - failedFuture); - - ExecutionVertex vertex = mock(ExecutionVertex.class); - when(vertex.getJobvertexId()).thenReturn(new JobVertexID()); - when(vertex.getCurrentExecutionAttempt()).thenReturn(exec); - - return vertex; - } - - private ExecutionVertex mockExecutionVertexWithTimeout( - ExecutionAttemptID executionId, - ExecutionState state, - ScheduledExecutorService scheduledExecutorService, - int timeout) { - - final CompletableFuture<StackTraceSampleResponse> future = new CompletableFuture<>(); - - Execution exec = mock(Execution.class); - when(exec.getAttemptId()).thenReturn(executionId); - when(exec.getState()).thenReturn(state); - when(exec.requestStackTraceSample(anyInt(), anyInt(), any(Time.class), anyInt(), any(Time.class))) - .thenReturn(future); - - scheduledExecutorService.schedule(new Runnable() { - @Override - public void run() { - future.completeExceptionally(new TimeoutException("Timeout")); - } - }, timeout, TimeUnit.MILLISECONDS); - - ExecutionVertex vertex = mock(ExecutionVertex.class); - when(vertex.getJobvertexId()).thenReturn(new JobVertexID()); - when(vertex.getCurrentExecutionAttempt()).thenReturn(exec); - - return vertex; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/files/MimeTypesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/files/MimeTypesTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/files/MimeTypesTest.java deleted file mode 100644 index 0a8d9d8..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/files/MimeTypesTest.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.files; - -import org.apache.flink.runtime.rest.handler.util.MimeTypes; - -import org.junit.Test; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - -/** - * Tests for the MIME types map. - */ -public class MimeTypesTest { - - @Test - public void testCompleteness() { - try { - assertNotNull(MimeTypes.getMimeTypeForExtension("txt")); - assertNotNull(MimeTypes.getMimeTypeForExtension("htm")); - assertNotNull(MimeTypes.getMimeTypeForExtension("html")); - assertNotNull(MimeTypes.getMimeTypeForExtension("css")); - assertNotNull(MimeTypes.getMimeTypeForExtension("js")); - assertNotNull(MimeTypes.getMimeTypeForExtension("json")); - assertNotNull(MimeTypes.getMimeTypeForExtension("png")); - assertNotNull(MimeTypes.getMimeTypeForExtension("jpg")); - assertNotNull(MimeTypes.getMimeTypeForExtension("jpeg")); - assertNotNull(MimeTypes.getMimeTypeForExtension("gif")); - assertNotNull(MimeTypes.getMimeTypeForExtension("woff")); - assertNotNull(MimeTypes.getMimeTypeForExtension("woff2")); - assertNotNull(MimeTypes.getMimeTypeForExtension("otf")); - assertNotNull(MimeTypes.getMimeTypeForExtension("ttf")); - assertNotNull(MimeTypes.getMimeTypeForExtension("eot")); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testFileNameExtraction() { - try { - assertNotNull(MimeTypes.getMimeTypeForFileName("test.txt")); - assertNotNull(MimeTypes.getMimeTypeForFileName("t.txt")); - assertNotNull(MimeTypes.getMimeTypeForFileName("first.second.third.txt")); - - assertNull(MimeTypes.getMimeTypeForFileName(".txt")); - assertNull(MimeTypes.getMimeTypeForFileName("txt")); - assertNull(MimeTypes.getMimeTypeForFileName("test.")); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java deleted file mode 100644 index 69ee762..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.Executors; - -import org.junit.Assert; -import org.junit.Test; - -/** - * Tests for the ClusterOverviewHandler. - */ -public class ClusterOverviewHandlerTest { - @Test - public void testGetPaths() { - ClusterOverviewHandler handler = new ClusterOverviewHandler(Executors.directExecutor(), Time.seconds(0L)); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/overview", paths[0]); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java deleted file mode 100644 index 6061e4b..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.Executors; - -import org.junit.Assert; -import org.junit.Test; - -/** - * Tests for the CurrentJobIdsHandler. - */ -public class CurrentJobIdsHandlerTest { - @Test - public void testGetPaths() { - CurrentJobIdsHandler handler = new CurrentJobIdsHandler(Executors.directExecutor(), Time.seconds(0L)); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs", paths[0]); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java deleted file mode 100644 index ccfafd4..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; -import org.apache.flink.runtime.webmonitor.WebMonitorUtils; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.Collection; - -/** - * Tests for the CurrentJobsOverviewHandler. - */ -public class CurrentJobsOverviewHandlerTest { - - @Test - public void testArchiver() throws Exception { - JsonArchivist archivist = new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(); - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob); - - Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); - Assert.assertEquals(1, archives.size()); - - ArchivedJson archive = archives.iterator().next(); - Assert.assertEquals("/joboverview", archive.getPath()); - - JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(archive.getJson()); - ArrayNode running = (ArrayNode) result.get("running"); - Assert.assertEquals(0, running.size()); - - ArrayNode finished = (ArrayNode) result.get("finished"); - Assert.assertEquals(1, finished.size()); - - compareJobOverview(expectedDetails, finished.get(0).toString()); - } - - @Test - public void testGetPaths() { - CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, true); - String[] pathsAll = handlerAll.getPaths(); - Assert.assertEquals(1, pathsAll.length); - Assert.assertEquals("/joboverview", pathsAll[0]); - - CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, false); - String[] pathsRunning = handlerRunning.getPaths(); - Assert.assertEquals(1, pathsRunning.length); - Assert.assertEquals("/joboverview/running", pathsRunning[0]); - - CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), false, true); - String[] pathsCompleted = handlerCompleted.getPaths(); - Assert.assertEquals(1, pathsCompleted.length); - Assert.assertEquals("/joboverview/completed", pathsCompleted[0]); - } - - @Test - public void testJsonGeneration() throws Exception { - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob); - StringWriter writer = new StringWriter(); - try (JsonGenerator gen = ArchivedJobGenerationUtils.JACKSON_FACTORY.createGenerator(writer)) { - CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(expectedDetails, gen, 0); - } - compareJobOverview(expectedDetails, writer.toString()); - } - - private static void compareJobOverview(JobDetails expectedDetails, String answer) throws IOException { - JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(answer); - - Assert.assertEquals(expectedDetails.getJobId().toString(), result.get("jid").asText()); - Assert.assertEquals(expectedDetails.getJobName(), result.get("name").asText()); - Assert.assertEquals(expectedDetails.getStatus().name(), result.get("state").asText()); - - Assert.assertEquals(expectedDetails.getStartTime(), result.get("start-time").asLong()); - Assert.assertEquals(expectedDetails.getEndTime(), result.get("end-time").asLong()); - Assert.assertEquals(expectedDetails.getEndTime() - expectedDetails.getStartTime(), result.get("duration").asLong()); - Assert.assertEquals(expectedDetails.getLastUpdateTime(), result.get("last-modification").asLong()); - - JsonNode tasks = result.get("tasks"); - Assert.assertEquals(expectedDetails.getNumTasks(), tasks.get("total").asInt()); - int[] tasksPerState = expectedDetails.getNumVerticesPerExecutionState(); - Assert.assertEquals( - tasksPerState[ExecutionState.CREATED.ordinal()] + tasksPerState[ExecutionState.SCHEDULED.ordinal()] + tasksPerState[ExecutionState.DEPLOYING.ordinal()], - tasks.get("pending").asInt()); - Assert.assertEquals(tasksPerState[ExecutionState.RUNNING.ordinal()], tasks.get("running").asInt()); - Assert.assertEquals(tasksPerState[ExecutionState.FINISHED.ordinal()], tasks.get("finished").asInt()); - Assert.assertEquals(tasksPerState[ExecutionState.CANCELING.ordinal()], tasks.get("canceling").asInt()); - Assert.assertEquals(tasksPerState[ExecutionState.CANCELED.ordinal()], tasks.get("canceled").asInt()); - Assert.assertEquals(tasksPerState[ExecutionState.FAILED.ordinal()], tasks.get("failed").asInt()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java deleted file mode 100644 index 22b3e5e..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.util.EnvironmentInformation; -import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; - -import com.fasterxml.jackson.databind.JsonNode; -import org.junit.Assert; -import org.junit.Test; - -import java.util.TimeZone; - -/** - * Tests for the DashboardConfigHandler. - */ -public class DashboardConfigHandlerTest { - @Test - public void testGetPaths() { - DashboardConfigHandler handler = new DashboardConfigHandler(Executors.directExecutor(), 10000L); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/config", paths[0]); - } - - @Test - public void testJsonGeneration() throws Exception { - long refreshInterval = 12345; - TimeZone timeZone = TimeZone.getDefault(); - EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation(); - - String json = DashboardConfigHandler.createConfigJson(refreshInterval); - - JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); - - Assert.assertEquals(refreshInterval, result.get("refresh-interval").asLong()); - Assert.assertEquals(timeZone.getDisplayName(), result.get("timezone-name").asText()); - Assert.assertEquals(timeZone.getRawOffset(), result.get("timezone-offset").asLong()); - Assert.assertEquals(EnvironmentInformation.getVersion(), result.get("flink-version").asText()); - Assert.assertEquals(revision.commitId + " @ " + revision.commitDate, result.get("flink-revision").asText()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java deleted file mode 100644 index e79be96..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.akka.AkkaJobManagerGateway; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils; -import org.apache.flink.util.TestLogger; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.Optional; -import java.util.concurrent.CompletableFuture; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * Tests for the HandlerRedirectUtils. - */ -public class HandlerRedirectUtilsTest extends TestLogger { - - private static final String localRestAddress = "http://127.0.0.1:1234"; - private static final String remoteRestAddress = "http://127.0.0.2:1234"; - - @Test - public void testGetRedirectAddressWithLocalEqualsRemoteRESTAddress() throws Exception { - JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); - when(jobManagerGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(localRestAddress)); - - CompletableFuture<Optional<String>> redirectingAddressFuture = HandlerRedirectUtils.getRedirectAddress( - localRestAddress, - jobManagerGateway, - Time.seconds(3L)); - - Assert.assertTrue(redirectingAddressFuture.isDone()); - // no redirection needed - Assert.assertFalse(redirectingAddressFuture.get().isPresent()); - } - - @Test - public void testGetRedirectAddressWithRemoteAkkaPath() throws Exception { - JobManagerGateway jobManagerGateway = mock(AkkaJobManagerGateway.class); - when(jobManagerGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(remoteRestAddress)); - - CompletableFuture<Optional<String>> optRedirectingAddress = HandlerRedirectUtils.getRedirectAddress( - localRestAddress, - jobManagerGateway, - Time.seconds(3L)); - - Assert.assertTrue(optRedirectingAddress.isDone()); - - Assert.assertEquals(remoteRestAddress, optRedirectingAddress.get().get()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandlerTest.java index 647e782..eb0f6b3 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandlerTest.java @@ -19,15 +19,14 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; -import org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.JarActionHandlerConfig; +import org.junit.Assert; import org.junit.Test; import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; /** * Tests for the JarActionHandler. @@ -49,7 +48,7 @@ public class JarActionHandlerTest { Map<String, String> queryParams = new HashMap<>(); // <-- everything goes here // Nothing configured - JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams); + JarActionHandler.JarActionHandlerConfig config = JarActionHandler.JarActionHandlerConfig.fromParams(pathParams, queryParams); assertEquals(SavepointRestoreSettings.none(), config.getSavepointRestoreSettings()); // Set path @@ -58,14 +57,14 @@ public class JarActionHandlerTest { SavepointRestoreSettings expected = SavepointRestoreSettings.forPath("the-savepoint-path", false); - config = JarActionHandlerConfig.fromParams(pathParams, queryParams); + config = JarActionHandler.JarActionHandlerConfig.fromParams(pathParams, queryParams); assertEquals(expected, config.getSavepointRestoreSettings()); // Set flag queryParams.put("allowNonRestoredState", "true"); expected = SavepointRestoreSettings.forPath("the-savepoint-path", true); - config = JarActionHandlerConfig.fromParams(pathParams, queryParams); + config = JarActionHandler.JarActionHandlerConfig.fromParams(pathParams, queryParams); assertEquals(expected, config.getSavepointRestoreSettings()); } @@ -85,10 +84,10 @@ public class JarActionHandlerTest { queryParams.put("allowNonRestoredState", ""); // Nothing configured - JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams); + JarActionHandler.JarActionHandlerConfig config = JarActionHandler.JarActionHandlerConfig.fromParams(pathParams, queryParams); assertEquals(0, config.getProgramArgs().length); - assertNull(config.getEntryClass()); + Assert.assertNull(config.getEntryClass()); assertEquals(1, config.getParallelism()); assertEquals(SavepointRestoreSettings.none(), config.getSavepointRestoreSettings()); } http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java deleted file mode 100644 index 5510fed..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collection; - -import static org.mockito.Mockito.mock; - -/** - * Tests for the JobAccumulatorsHandler. - */ -public class JobAccumulatorsHandlerTest { - - @Test - public void testArchiver() throws Exception { - JsonArchivist archivist = new JobAccumulatorsHandler.JobAccumulatorsJsonArchivist(); - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - - Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); - Assert.assertEquals(1, archives.size()); - - ArchivedJson archive = archives.iterator().next(); - Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/accumulators", archive.getPath()); - compareAccumulators(originalJob, archive.getJson()); - } - - @Test - public void testGetPaths() { - JobAccumulatorsHandler handler = new JobAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]); - } - - @Test - public void testJsonGeneration() throws Exception { - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - String json = JobAccumulatorsHandler.createJobAccumulatorsJson(originalJob); - - compareAccumulators(originalJob, json); - } - - private static void compareAccumulators(AccessExecutionGraph originalJob, String json) throws IOException { - JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); - - ArrayNode accs = (ArrayNode) result.get("job-accumulators"); - Assert.assertEquals(0, accs.size()); - - Assert.assertTrue(originalJob.getAccumulatorResultsStringified().length > 0); - ArchivedJobGenerationUtils.compareStringifiedAccumulators( - originalJob.getAccumulatorResultsStringified(), - (ArrayNode) result.get("user-task-accumulators")); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java deleted file mode 100644 index 86c5295..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.testingUtils.TestingUtils; - -import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.List; - -/** - * Tests for the JobCancellationHandler. - */ -public class JobCancellationHandlerTest { - @Test - public void testGetPaths() { - JobCancellationHandler handler = new JobCancellationHandler(Executors.directExecutor(), TestingUtils.TIMEOUT()); - String[] paths = handler.getPaths(); - Assert.assertEquals(2, paths.length); - List<String> pathsList = Lists.newArrayList(paths); - Assert.assertTrue(pathsList.contains("/jobs/:jobid/cancel")); - Assert.assertTrue(pathsList.contains("/jobs/:jobid/yarn-cancel")); - } -}
