http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java new file mode 100644 index 0000000..3783b84 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the SubtasksTimesHandler. + */ +public class SubtasksTimesHandlerTest extends TestLogger { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new SubtasksTimesHandler.SubtasksTimesJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/subtasktimes", archive.getPath()); + compareSubtaskTimes(originalTask, originalAttempt, archive.getJson()); + } + + @Test + public void testGetPaths() { + SubtasksTimesHandler handler = new SubtasksTimesHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasktimes", paths[0]); + } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); + String json = SubtasksTimesHandler.createSubtaskTimesJson(originalTask); + + compareSubtaskTimes(originalTask, originalAttempt, json); + } + + private static void compareSubtaskTimes(AccessExecutionJobVertex originalTask, AccessExecution originalAttempt, String json) throws IOException { + JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); + + Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); + Assert.assertEquals(originalTask.getName(), result.get("name").asText()); + Assert.assertTrue(result.get("now").asLong() > 0L); + + ArrayNode subtasks = (ArrayNode) result.get("subtasks"); + + JsonNode subtask = subtasks.get(0); + Assert.assertEquals(0, subtask.get("subtask").asInt()); + Assert.assertEquals(originalAttempt.getAssignedResourceLocation().getHostname(), subtask.get("host").asText()); + Assert.assertEquals(originalAttempt.getStateTimestamp(originalAttempt.getState()) - originalAttempt.getStateTimestamp(ExecutionState.SCHEDULED), subtask.get("duration").asLong()); + + JsonNode timestamps = subtask.get("timestamps"); + + Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CREATED), timestamps.get(ExecutionState.CREATED.name()).asLong()); + Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.SCHEDULED), timestamps.get(ExecutionState.SCHEDULED.name()).asLong()); + Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.DEPLOYING), timestamps.get(ExecutionState.DEPLOYING.name()).asLong()); + Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.RUNNING), timestamps.get(ExecutionState.RUNNING.name()).asLong()); + Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.FINISHED), timestamps.get(ExecutionState.FINISHED.name()).asLong()); + Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CANCELING), timestamps.get(ExecutionState.CANCELING.name()).asLong()); + Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CANCELED), timestamps.get(ExecutionState.CANCELED.name()).asLong()); + Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.FAILED), timestamps.get(ExecutionState.FAILED.name()).asLong()); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java new file mode 100644 index 0000000..b65dcb6 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the TaskManagersLogHandler. + */ +public class TaskManagerLogHandlerTest { + @Test + public void testGetPaths() { + TaskManagerLogHandler handlerLog = new TaskManagerLogHandler( + mock(GatewayRetriever.class), + Executors.directExecutor(), + CompletableFuture.completedFuture("/jm/address"), + TestingUtils.TIMEOUT(), + TaskManagerLogHandler.FileMode.LOG, + new Configuration(), + new VoidBlobStore()); + String[] pathsLog = handlerLog.getPaths(); + Assert.assertEquals(1, pathsLog.length); + Assert.assertEquals("/taskmanagers/:taskmanagerid/log", pathsLog[0]); + + TaskManagerLogHandler handlerOut = new TaskManagerLogHandler( + mock(GatewayRetriever.class), + Executors.directExecutor(), + CompletableFuture.completedFuture("/jm/address"), + TestingUtils.TIMEOUT(), + TaskManagerLogHandler.FileMode.STDOUT, + new Configuration(), + new VoidBlobStore()); + String[] pathsOut = handlerOut.getPaths(); + Assert.assertEquals(1, pathsOut.length); + Assert.assertEquals("/taskmanagers/:taskmanagerid/stdout", pathsOut[0]); + } + + @Test + public void testLogFetchingFailure() throws Exception { + // ========= setup TaskManager ================================================================================= + InstanceID tmID = new InstanceID(); + ResourceID tmRID = new ResourceID(tmID.toString()); + TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class); + when(taskManagerGateway.getAddress()).thenReturn("/tm/address"); + + Instance taskManager = mock(Instance.class); + when(taskManager.getId()).thenReturn(tmID); + when(taskManager.getTaskManagerID()).thenReturn(tmRID); + when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway); + CompletableFuture<BlobKey> future = new CompletableFuture<>(); + future.completeExceptionally(new IOException("failure")); + when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future); + + // ========= setup JobManager ================================================================================== + + JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + when(jobManagerGateway.requestBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(1337)); + when(jobManagerGateway.getHostname()).thenReturn("localhost"); + when(jobManagerGateway.requestTaskManagerInstance(any(InstanceID.class), any(Time.class))).thenReturn( + CompletableFuture.completedFuture(Optional.of(taskManager))); + + GatewayRetriever<JobManagerGateway> retriever = mock(GatewayRetriever.class); + when(retriever.getNow()) + .thenReturn(Optional.of(jobManagerGateway)); + + TaskManagerLogHandler handler = new TaskManagerLogHandler( + retriever, + Executors.directExecutor(), + CompletableFuture.completedFuture("/jm/address"), + TestingUtils.TIMEOUT(), + TaskManagerLogHandler.FileMode.LOG, + new Configuration(), + new VoidBlobStore()); + + final AtomicReference<String> exception = new AtomicReference<>(); + + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + when(ctx.write(isA(ByteBuf.class))).thenAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + ByteBuf data = invocationOnMock.getArgumentAt(0, ByteBuf.class); + exception.set(new String(data.array(), ConfigConstants.DEFAULT_CHARSET)); + return null; + } + }); + + Map<String, String> pathParams = new HashMap<>(); + pathParams.put(TaskManagersHandler.TASK_MANAGER_ID_KEY, tmID.toString()); + Routed routed = mock(Routed.class); + when(routed.pathParams()).thenReturn(pathParams); + when(routed.request()).thenReturn(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/taskmanagers/" + tmID + "/log")); + + handler.respondAsLeader(ctx, routed, jobManagerGateway); + + Assert.assertEquals("Fetching TaskManager log failed.", exception.get()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandlerTest.java new file mode 100644 index 0000000..2f8afd1 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandlerTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.Executors; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +/** + * Tests for the TaskManagersHandler. + */ +public class TaskManagersHandlerTest { + @Test + public void testGetPaths() { + TaskManagersHandler handler = new TaskManagersHandler(Executors.directExecutor(), Time.seconds(0L), null); + String[] paths = handler.getPaths(); + Assert.assertEquals(2, paths.length); + List<String> pathsList = Lists.newArrayList(paths); + Assert.assertTrue(pathsList.contains("/taskmanagers")); + Assert.assertTrue(pathsList.contains("/taskmanagers/:taskmanagerid")); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java new file mode 100644 index 0000000..e2289f0 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.akka.AkkaJobManagerGateway; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobClient; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.FiniteDuration; + +import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning; +import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound; +import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph; +import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning; + +/** + * Simple back pressured task test. + */ +public class BackPressureStatsTrackerITCase extends TestLogger { + + private static NetworkBufferPool networkBufferPool; + private static ActorSystem testActorSystem; + + /** Shared as static variable with the test task. */ + private static BufferPool testBufferPool; + + @BeforeClass + public static void setup() { + testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); + networkBufferPool = new NetworkBufferPool(100, 8192, MemoryType.HEAP); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(testActorSystem); + networkBufferPool.destroy(); + } + + /** + * Tests a simple fake-back pressured task. Back pressure is assumed when + * sampled stack traces are in blocking buffer requests. + */ + @Test + public void testBackPressuredProducer() throws Exception { + new JavaTestKit(testActorSystem) {{ + final FiniteDuration deadline = new FiniteDuration(60, TimeUnit.SECONDS); + + // The JobGraph + final JobGraph jobGraph = new JobGraph(); + final int parallelism = 4; + + final JobVertex task = new JobVertex("Task"); + task.setInvokableClass(BackPressuredTask.class); + task.setParallelism(parallelism); + + jobGraph.addVertex(task); + + final Configuration config = new Configuration(); + + final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( + config, + TestingUtils.defaultExecutor()); + + ActorGateway jobManger = null; + ActorGateway taskManager = null; + + // + // 1) Consume all buffers at first (no buffers for the test task) + // + testBufferPool = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE); + final List<Buffer> buffers = new ArrayList<>(); + while (true) { + Buffer buffer = testBufferPool.requestBuffer(); + if (buffer != null) { + buffers.add(buffer); + } else { + break; + } + } + + try { + jobManger = TestingUtils.createJobManager( + testActorSystem, + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + config, + highAvailabilityServices); + + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism); + + taskManager = TestingUtils.createTaskManager( + testActorSystem, + highAvailabilityServices, + config, + true, + true); + + final ActorGateway jm = jobManger; + + new Within(deadline) { + @Override + protected void run() { + try { + ActorGateway testActor = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID); + + // Submit the job and wait until it is running + JobClient.submitJobDetached( + new AkkaJobManagerGateway(jm), + config, + jobGraph, + Time.milliseconds(deadline.toMillis()), + ClassLoader.getSystemClassLoader()); + + jm.tell(new WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor); + + expectMsgEquals(new AllVerticesRunning(jobGraph.getJobID())); + + // Get the ExecutionGraph + jm.tell(new RequestExecutionGraph(jobGraph.getJobID()), testActor); + + ExecutionGraphFound executionGraphResponse = + expectMsgClass(ExecutionGraphFound.class); + + ExecutionGraph executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph(); + ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID()); + + StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator( + testActorSystem.dispatcher(), 60000); + + // Verify back pressure (clean up interval can be ignored) + BackPressureStatsTracker statsTracker = new BackPressureStatsTracker( + coordinator, + 100 * 1000, + 20, + Time.milliseconds(10L)); + + int numAttempts = 10; + + int nextSampleId = 0; + + // Verify that all tasks are back pressured. This + // can fail if the task takes longer to request + // the buffer. + for (int attempt = 0; attempt < numAttempts; attempt++) { + try { + OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex); + + Assert.assertEquals(nextSampleId + attempt, stats.getSampleId()); + Assert.assertEquals(parallelism, stats.getNumberOfSubTasks()); + Assert.assertEquals(1.0, stats.getMaxBackPressureRatio(), 0.0); + + for (int i = 0; i < parallelism; i++) { + Assert.assertEquals(1.0, stats.getBackPressureRatio(i), 0.0); + } + + nextSampleId = stats.getSampleId() + 1; + + break; + } catch (Throwable t) { + if (attempt == numAttempts - 1) { + throw t; + } else { + Thread.sleep(500); + } + } + } + + // + // 2) Release all buffers and let the tasks grab one + // + for (Buffer buf : buffers) { + buf.recycle(); + } + + // Wait for all buffers to be available. The tasks + // grab them and then immediately release them. + while (testBufferPool.getNumberOfAvailableMemorySegments() < 100) { + Thread.sleep(100); + } + + // Verify that no task is back pressured any more. + for (int attempt = 0; attempt < numAttempts; attempt++) { + try { + OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex); + + Assert.assertEquals(nextSampleId + attempt, stats.getSampleId()); + Assert.assertEquals(parallelism, stats.getNumberOfSubTasks()); + + // Verify that no task is back pressured + for (int i = 0; i < parallelism; i++) { + Assert.assertEquals(0.0, stats.getBackPressureRatio(i), 0.0); + } + + break; + } catch (Throwable t) { + if (attempt == numAttempts - 1) { + throw t; + } else { + Thread.sleep(500); + } + } + } + + // Shut down + jm.tell(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), testActor); + + // Cancel job + jm.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID())); + + // Response to removal notification + expectMsgEquals(true); + + // + // 3) Trigger stats for archived job + // + statsTracker.invalidateOperatorStatsCache(); + Assert.assertFalse("Unexpected trigger", statsTracker.triggerStackTraceSample(vertex)); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + }; + } finally { + TestingUtils.stopActor(jobManger); + TestingUtils.stopActor(taskManager); + + highAvailabilityServices.closeAndCleanupAllData(); + + for (Buffer buf : buffers) { + buf.recycle(); + } + + testBufferPool.lazyDestroy(); + } + }}; + } + + /** + * Triggers a new stats sample. + */ + private OperatorBackPressureStats triggerStatsSample( + BackPressureStatsTracker statsTracker, + ExecutionJobVertex vertex) throws InterruptedException { + + statsTracker.invalidateOperatorStatsCache(); + Assert.assertTrue("Failed to trigger", statsTracker.triggerStackTraceSample(vertex)); + + // Sleep minimum duration + Thread.sleep(20 * 10); + + Optional<OperatorBackPressureStats> stats; + + // Get the stats + while (!(stats = statsTracker.getOperatorBackPressureStats(vertex)).isPresent()) { + Thread.sleep(10); + } + + return stats.get(); + } + + /** + * A back pressured producer sharing a {@link BufferPool} with the + * test driver. + */ + public static class BackPressuredTask extends AbstractInvokable { + + @Override + public void invoke() throws Exception { + while (true) { + Buffer buffer = testBufferPool.requestBufferBlocking(); + // Got a buffer, yay! + buffer.recycle(); + + new CountDownLatch(1).await(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java new file mode 100644 index 0000000..02f954a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Tests for the BackPressureStatsTracker. + */ +public class BackPressureStatsTrackerTest extends TestLogger { + + /** Tests simple statistics with fake stack traces. */ + @Test + @SuppressWarnings("unchecked") + public void testTriggerStackTraceSample() throws Exception { + CompletableFuture<StackTraceSample> sampleFuture = new CompletableFuture<>(); + + StackTraceSampleCoordinator sampleCoordinator = Mockito.mock(StackTraceSampleCoordinator.class); + Mockito.when(sampleCoordinator.triggerStackTraceSample( + Matchers.any(ExecutionVertex[].class), + Matchers.anyInt(), + Matchers.any(Time.class), + Matchers.anyInt())).thenReturn(sampleFuture); + + ExecutionGraph graph = Mockito.mock(ExecutionGraph.class); + Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING); + + // Same Thread execution context + Mockito.when(graph.getFutureExecutor()).thenReturn(new Executor() { + + @Override + public void execute(Runnable runnable) { + runnable.run(); + } + }); + + ExecutionVertex[] taskVertices = new ExecutionVertex[4]; + + ExecutionJobVertex jobVertex = Mockito.mock(ExecutionJobVertex.class); + Mockito.when(jobVertex.getJobId()).thenReturn(new JobID()); + Mockito.when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID()); + Mockito.when(jobVertex.getGraph()).thenReturn(graph); + Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices); + + taskVertices[0] = mockExecutionVertex(jobVertex, 0); + taskVertices[1] = mockExecutionVertex(jobVertex, 1); + taskVertices[2] = mockExecutionVertex(jobVertex, 2); + taskVertices[3] = mockExecutionVertex(jobVertex, 3); + + int numSamples = 100; + Time delayBetweenSamples = Time.milliseconds(100L); + + BackPressureStatsTracker tracker = new BackPressureStatsTracker( + sampleCoordinator, 9999, numSamples, delayBetweenSamples); + + // Trigger + Assert.assertTrue("Failed to trigger", tracker.triggerStackTraceSample(jobVertex)); + + Mockito.verify(sampleCoordinator).triggerStackTraceSample( + Matchers.eq(taskVertices), + Matchers.eq(numSamples), + Matchers.eq(delayBetweenSamples), + Matchers.eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH)); + + // Trigger again for pending request, should not fire + Assert.assertFalse("Unexpected trigger", tracker.triggerStackTraceSample(jobVertex)); + + Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent()); + + Mockito.verify(sampleCoordinator).triggerStackTraceSample( + Matchers.eq(taskVertices), + Matchers.eq(numSamples), + Matchers.eq(delayBetweenSamples), + Matchers.eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH)); + + Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent()); + + // Complete the future + Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = new HashMap<>(); + for (ExecutionVertex vertex : taskVertices) { + List<StackTraceElement[]> taskTraces = new ArrayList<>(); + + for (int i = 0; i < taskVertices.length; i++) { + // Traces until sub task index are back pressured + taskTraces.add(createStackTrace(i <= vertex.getParallelSubtaskIndex())); + } + + traces.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskTraces); + } + + int sampleId = 1231; + int endTime = 841; + + StackTraceSample sample = new StackTraceSample( + sampleId, + 0, + endTime, + traces); + + // Succeed the promise + sampleFuture.complete(sample); + + Assert.assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isPresent()); + + OperatorBackPressureStats stats = tracker.getOperatorBackPressureStats(jobVertex).get(); + + // Verify the stats + Assert.assertEquals(sampleId, stats.getSampleId()); + Assert.assertEquals(endTime, stats.getEndTimestamp()); + Assert.assertEquals(taskVertices.length, stats.getNumberOfSubTasks()); + + for (int i = 0; i < taskVertices.length; i++) { + double ratio = stats.getBackPressureRatio(i); + // Traces until sub task index are back pressured + Assert.assertEquals((i + 1) / ((double) 4), ratio, 0.0); + } + } + + private StackTraceElement[] createStackTrace(boolean isBackPressure) { + if (isBackPressure) { + return new StackTraceElement[] { new StackTraceElement( + BackPressureStatsTracker.EXPECTED_CLASS_NAME, + BackPressureStatsTracker.EXPECTED_METHOD_NAME, + "LocalBufferPool.java", + 133) }; + } else { + return Thread.currentThread().getStackTrace(); + } + } + + private ExecutionVertex mockExecutionVertex( + ExecutionJobVertex jobVertex, + int subTaskIndex) { + + Execution exec = Mockito.mock(Execution.class); + Mockito.when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID()); + + JobVertexID id = jobVertex.getJobVertexId(); + + ExecutionVertex vertex = Mockito.mock(ExecutionVertex.class); + Mockito.when(vertex.getJobvertexId()).thenReturn(id); + Mockito.when(vertex.getCurrentExecutionAttempt()).thenReturn(exec); + Mockito.when(vertex.getParallelSubtaskIndex()).thenReturn(subTaskIndex); + + return vertex; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java new file mode 100644 index 0000000..8fa302a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaJobManagerGateway; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobClient; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; +import org.apache.flink.util.TestLogger; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning; +import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound; +import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph; +import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning; + +/** + * Simple stack trace sampling test. + */ +public class StackTraceSampleCoordinatorITCase extends TestLogger { + + private static ActorSystem testActorSystem; + + @BeforeClass + public static void setup() { + testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(testActorSystem); + } + + /** + * Tests that a cleared task is answered with a partial success response. + */ + @Test + public void testTaskClearedWhileSampling() throws Exception { + new JavaTestKit(testActorSystem) {{ + final FiniteDuration deadline = new FiniteDuration(60, TimeUnit.SECONDS); + + // The JobGraph + final JobGraph jobGraph = new JobGraph(); + final int parallelism = 1; + + final JobVertex task = new JobVertex("Task"); + task.setInvokableClass(BlockingNoOpInvokable.class); + task.setParallelism(parallelism); + + jobGraph.addVertex(task); + + final Configuration config = new Configuration(); + + final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( + config, + TestingUtils.defaultExecutor()); + + ActorGateway jobManger = null; + ActorGateway taskManager = null; + + try { + jobManger = TestingUtils.createJobManager( + testActorSystem, + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + config, + highAvailabilityServices); + + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism); + + taskManager = TestingUtils.createTaskManager( + testActorSystem, + highAvailabilityServices, + config, + true, + true); + + final ActorGateway jm = jobManger; + + new Within(deadline) { + @Override + protected void run() { + try { + ActorGateway testActor = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID); + + int maxAttempts = 10; + int sleepTime = 100; + + for (int i = 0; i < maxAttempts; i++, sleepTime *= 2) { + // Submit the job and wait until it is running + JobClient.submitJobDetached( + new AkkaJobManagerGateway(jm), + config, + jobGraph, + Time.milliseconds(deadline.toMillis()), + ClassLoader.getSystemClassLoader()); + + jm.tell(new WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor); + + expectMsgEquals(new AllVerticesRunning(jobGraph.getJobID())); + + // Get the ExecutionGraph + jm.tell(new RequestExecutionGraph(jobGraph.getJobID()), testActor); + ExecutionGraphFound executionGraphResponse = + expectMsgClass(ExecutionGraphFound.class); + ExecutionGraph executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph(); + ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID()); + + StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator( + testActorSystem.dispatcher(), 60000); + + CompletableFuture<StackTraceSample> sampleFuture = coordinator.triggerStackTraceSample( + vertex.getTaskVertices(), + // Do this often so we have a good + // chance of removing the job during + // sampling. + 21474700 * 100, + Time.milliseconds(10L), + 0); + + // Wait before cancelling so that some samples + // are actually taken. + Thread.sleep(sleepTime); + + // Cancel job + Future<?> removeFuture = jm.ask( + new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), + remaining()); + + jm.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID())); + + try { + // Throws Exception on failure + sampleFuture.get(remaining().toMillis(), TimeUnit.MILLISECONDS); + + // OK, we are done. Got the expected + // partial result. + break; + } catch (Throwable t) { + // We were too fast in cancelling the job. + // Fall through and retry. + } finally { + Await.ready(removeFuture, remaining()); + } + } + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + }; + } finally { + TestingUtils.stopActor(jobManger); + TestingUtils.stopActor(taskManager); + + highAvailabilityServices.closeAndCleanupAllData(); + } + }}; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java new file mode 100644 index 0000000..786b0ae --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample; +import org.apache.flink.runtime.messages.StackTraceSampleResponse; +import org.apache.flink.util.TestLogger; + +import akka.actor.ActorSystem; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Test for the {@link StackTraceSampleCoordinator}. + */ +public class StackTraceSampleCoordinatorTest extends TestLogger { + + private static ActorSystem system; + + private StackTraceSampleCoordinator coord; + + @BeforeClass + public static void setUp() throws Exception { + system = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void tearDown() throws Exception { + if (system != null) { + system.shutdown(); + } + } + + @Before + public void init() throws Exception { + this.coord = new StackTraceSampleCoordinator(system.dispatcher(), 60000); + } + + /** Tests simple trigger and collect of stack trace samples. */ + @Test + public void testTriggerStackTraceSample() throws Exception { + ExecutionVertex[] vertices = new ExecutionVertex[] { + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true) + }; + + int numSamples = 1; + Time delayBetweenSamples = Time.milliseconds(100L); + int maxStackTraceDepth = 0; + + CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( + vertices, numSamples, delayBetweenSamples, maxStackTraceDepth); + + // Verify messages have been sent + for (ExecutionVertex vertex : vertices) { + ExecutionAttemptID expectedExecutionId = vertex + .getCurrentExecutionAttempt().getAttemptId(); + + TriggerStackTraceSample expectedMsg = new TriggerStackTraceSample( + 0, + expectedExecutionId, + numSamples, + delayBetweenSamples, + maxStackTraceDepth); + + Mockito.verify(vertex.getCurrentExecutionAttempt()) + .requestStackTraceSample(Matchers.eq(0), Matchers.eq(numSamples), Matchers.eq(delayBetweenSamples), Matchers.eq(maxStackTraceDepth), Matchers.any(Time.class)); + } + + Assert.assertFalse(sampleFuture.isDone()); + + StackTraceElement[] stackTraceSample = Thread.currentThread().getStackTrace(); + List<StackTraceElement[]> traces = new ArrayList<>(); + traces.add(stackTraceSample); + traces.add(stackTraceSample); + traces.add(stackTraceSample); + + // Collect stack traces + for (int i = 0; i < vertices.length; i++) { + ExecutionAttemptID executionId = vertices[i].getCurrentExecutionAttempt().getAttemptId(); + coord.collectStackTraces(0, executionId, traces); + + if (i == vertices.length - 1) { + Assert.assertTrue(sampleFuture.isDone()); + } else { + Assert.assertFalse(sampleFuture.isDone()); + } + } + + // Verify completed stack trace sample + StackTraceSample sample = sampleFuture.get(); + + Assert.assertEquals(0, sample.getSampleId()); + Assert.assertTrue(sample.getEndTime() >= sample.getStartTime()); + + Map<ExecutionAttemptID, List<StackTraceElement[]>> tracesByTask = sample.getStackTraces(); + + for (ExecutionVertex vertex : vertices) { + ExecutionAttemptID executionId = vertex.getCurrentExecutionAttempt().getAttemptId(); + List<StackTraceElement[]> sampleTraces = tracesByTask.get(executionId); + + Assert.assertNotNull("Task not found", sampleTraces); + Assert.assertTrue(traces.equals(sampleTraces)); + } + + // Verify no more pending sample + Assert.assertEquals(0, coord.getNumberOfPendingSamples()); + + // Verify no error on late collect + coord.collectStackTraces(0, vertices[0].getCurrentExecutionAttempt().getAttemptId(), traces); + } + + /** Tests triggering for non-running tasks fails the future. */ + @Test + public void testTriggerStackTraceSampleNotRunningTasks() throws Exception { + ExecutionVertex[] vertices = new ExecutionVertex[] { + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.DEPLOYING, true) + }; + + CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( + vertices, + 1, + Time.milliseconds(100L), + 0); + + Assert.assertTrue(sampleFuture.isDone()); + + try { + sampleFuture.get(); + Assert.fail("Expected exception."); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof IllegalStateException); + } + } + + /** Tests triggering for reset tasks fails the future. */ + @Test(timeout = 1000L) + public void testTriggerStackTraceSampleResetRunningTasks() throws Exception { + ExecutionVertex[] vertices = new ExecutionVertex[] { + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + // Fails to send the message to the execution (happens when execution is reset) + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, false) + }; + + CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( + vertices, + 1, + Time.milliseconds(100L), + 0); + + try { + sampleFuture.get(); + Assert.fail("Expected exception."); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof RuntimeException); + } + } + + /** Tests that samples time out if they don't finish in time. */ + @Test(timeout = 1000L) + public void testTriggerStackTraceSampleTimeout() throws Exception { + int timeout = 100; + + coord = new StackTraceSampleCoordinator(system.dispatcher(), timeout); + + final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + + try { + + ExecutionVertex[] vertices = new ExecutionVertex[]{ + mockExecutionVertexWithTimeout( + new ExecutionAttemptID(), + ExecutionState.RUNNING, + scheduledExecutorService, + timeout) + }; + + CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( + vertices, 1, Time.milliseconds(100L), 0); + + // Wait for the timeout + Thread.sleep(timeout * 2); + + boolean success = false; + for (int i = 0; i < 10; i++) { + if (sampleFuture.isDone()) { + success = true; + break; + } + + Thread.sleep(timeout); + } + + Assert.assertTrue("Sample did not time out", success); + + try { + sampleFuture.get(); + Assert.fail("Expected exception."); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause().getCause().getMessage().contains("Timeout")); + } + + // Collect after the timeout (should be ignored) + ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId(); + coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>()); + } finally { + scheduledExecutorService.shutdownNow(); + } + } + + /** Tests that collecting an unknown sample is ignored. */ + @Test + public void testCollectStackTraceForUnknownSample() throws Exception { + coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList<StackTraceElement[]>()); + } + + /** Tests cancelling of a pending sample. */ + @Test + public void testCancelStackTraceSample() throws Exception { + ExecutionVertex[] vertices = new ExecutionVertex[] { + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + }; + + CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( + vertices, 1, Time.milliseconds(100L), 0); + + Assert.assertFalse(sampleFuture.isDone()); + + // Cancel + coord.cancelStackTraceSample(0, null); + + // Verify completed + Assert.assertTrue(sampleFuture.isDone()); + + // Verify no more pending samples + Assert.assertEquals(0, coord.getNumberOfPendingSamples()); + } + + /** Tests that collecting for a cancelled sample throws no Exception. */ + @Test + public void testCollectStackTraceForCanceledSample() throws Exception { + ExecutionVertex[] vertices = new ExecutionVertex[] { + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + }; + + CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( + vertices, 1, Time.milliseconds(100L), 0); + + Assert.assertFalse(sampleFuture.isDone()); + + coord.cancelStackTraceSample(0, null); + + Assert.assertTrue(sampleFuture.isDone()); + + // Verify no error on late collect + ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId(); + coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>()); + } + + /** Tests that collecting for a cancelled sample throws no Exception. */ + @Test + public void testCollectForDiscardedPendingSample() throws Exception { + ExecutionVertex[] vertices = new ExecutionVertex[] { + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + }; + + CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( + vertices, 1, Time.milliseconds(100L), 0); + + Assert.assertFalse(sampleFuture.isDone()); + + coord.cancelStackTraceSample(0, null); + + Assert.assertTrue(sampleFuture.isDone()); + + // Verify no error on late collect + ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId(); + coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>()); + } + + + /** Tests that collecting for a unknown task fails. */ + @Test(expected = IllegalArgumentException.class) + public void testCollectStackTraceForUnknownTask() throws Exception { + ExecutionVertex[] vertices = new ExecutionVertex[] { + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + }; + + coord.triggerStackTraceSample(vertices, 1, Time.milliseconds(100L), 0); + + coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList<StackTraceElement[]>()); + } + + /** Tests that shut down fails all pending samples and future sample triggers. */ + @Test + public void testShutDown() throws Exception { + ExecutionVertex[] vertices = new ExecutionVertex[] { + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + }; + + List<CompletableFuture<StackTraceSample>> sampleFutures = new ArrayList<>(); + + // Trigger + sampleFutures.add(coord.triggerStackTraceSample( + vertices, 1, Time.milliseconds(100L), 0)); + + sampleFutures.add(coord.triggerStackTraceSample( + vertices, 1, Time.milliseconds(100L), 0)); + + for (CompletableFuture<StackTraceSample> future : sampleFutures) { + Assert.assertFalse(future.isDone()); + } + + // Shut down + coord.shutDown(); + + // Verify all completed + for (CompletableFuture<StackTraceSample> future : sampleFutures) { + Assert.assertTrue(future.isDone()); + } + + // Verify new trigger returns failed future + CompletableFuture<StackTraceSample> future = coord.triggerStackTraceSample( + vertices, 1, Time.milliseconds(100L), 0); + + Assert.assertTrue(future.isDone()); + + try { + future.get(); + Assert.fail("Expected exception."); + } catch (ExecutionException e) { + // we expected an exception here :-) + } + + } + + // ------------------------------------------------------------------------ + + private ExecutionVertex mockExecutionVertex( + ExecutionAttemptID executionId, + ExecutionState state, + boolean sendSuccess) { + + Execution exec = Mockito.mock(Execution.class); + CompletableFuture<StackTraceSampleResponse> failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(new Exception("Send failed.")); + + Mockito.when(exec.getAttemptId()).thenReturn(executionId); + Mockito.when(exec.getState()).thenReturn(state); + Mockito.when(exec.requestStackTraceSample(Matchers.anyInt(), Matchers.anyInt(), Matchers.any(Time.class), Matchers.anyInt(), Matchers.any(Time.class))) + .thenReturn( + sendSuccess ? + CompletableFuture.completedFuture(Mockito.mock(StackTraceSampleResponse.class)) : + failedFuture); + + ExecutionVertex vertex = Mockito.mock(ExecutionVertex.class); + Mockito.when(vertex.getJobvertexId()).thenReturn(new JobVertexID()); + Mockito.when(vertex.getCurrentExecutionAttempt()).thenReturn(exec); + + return vertex; + } + + private ExecutionVertex mockExecutionVertexWithTimeout( + ExecutionAttemptID executionId, + ExecutionState state, + ScheduledExecutorService scheduledExecutorService, + int timeout) { + + final CompletableFuture<StackTraceSampleResponse> future = new CompletableFuture<>(); + + Execution exec = Mockito.mock(Execution.class); + Mockito.when(exec.getAttemptId()).thenReturn(executionId); + Mockito.when(exec.getState()).thenReturn(state); + Mockito.when(exec.requestStackTraceSample(Matchers.anyInt(), Matchers.anyInt(), Matchers.any(Time.class), Matchers.anyInt(), Matchers.any(Time.class))) + .thenReturn(future); + + scheduledExecutorService.schedule(new Runnable() { + @Override + public void run() { + future.completeExceptionally(new TimeoutException("Timeout")); + } + }, timeout, TimeUnit.MILLISECONDS); + + ExecutionVertex vertex = Mockito.mock(ExecutionVertex.class); + Mockito.when(vertex.getJobvertexId()).thenReturn(new JobVertexID()); + Mockito.when(vertex.getCurrentExecutionAttempt()).thenReturn(exec); + + return vertex; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java new file mode 100644 index 0000000..db91f58 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.checkpoints; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the CheckpointConfigHandler. + */ +public class CheckpointConfigHandlerTest { + + @Test + public void testArchiver() throws IOException { + JsonArchivist archivist = new CheckpointConfigHandler.CheckpointConfigJsonArchivist(); + GraphAndSettings graphAndSettings = createGraphAndSettings(true, true); + + AccessExecutionGraph graph = graphAndSettings.graph; + when(graph.getJobID()).thenReturn(new JobID()); + JobCheckpointingSettings settings = graphAndSettings.snapshottingSettings; + ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings; + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph); + Assert.assertEquals(1, archives.size()); + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + graph.getJobID() + "/checkpoints/config", archive.getPath()); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(archive.getJson()); + + Assert.assertEquals("exactly_once", rootNode.get("mode").asText()); + Assert.assertEquals(settings.getCheckpointInterval(), rootNode.get("interval").asLong()); + Assert.assertEquals(settings.getCheckpointTimeout(), rootNode.get("timeout").asLong()); + Assert.assertEquals(settings.getMinPauseBetweenCheckpoints(), rootNode.get("min_pause").asLong()); + Assert.assertEquals(settings.getMaxConcurrentCheckpoints(), rootNode.get("max_concurrent").asInt()); + + JsonNode externalizedNode = rootNode.get("externalization"); + Assert.assertNotNull(externalizedNode); + Assert.assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean()); + Assert.assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean()); + + } + + @Test + public void testGetPaths() { + CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/checkpoints/config", paths[0]); + } + + /** + * Tests a simple config. + */ + @Test + public void testSimpleConfig() throws Exception { + GraphAndSettings graphAndSettings = createGraphAndSettings(false, true); + + AccessExecutionGraph graph = graphAndSettings.graph; + JobCheckpointingSettings settings = graphAndSettings.snapshottingSettings; + + CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get(); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(json); + + assertEquals("exactly_once", rootNode.get("mode").asText()); + assertEquals(settings.getCheckpointInterval(), rootNode.get("interval").asLong()); + assertEquals(settings.getCheckpointTimeout(), rootNode.get("timeout").asLong()); + assertEquals(settings.getMinPauseBetweenCheckpoints(), rootNode.get("min_pause").asLong()); + assertEquals(settings.getMaxConcurrentCheckpoints(), rootNode.get("max_concurrent").asInt()); + + JsonNode externalizedNode = rootNode.get("externalization"); + assertNotNull(externalizedNode); + assertEquals(false, externalizedNode.get("enabled").asBoolean()); + } + + /** + * Tests the that the isExactlyOnce flag is respected. + */ + @Test + public void testAtLeastOnce() throws Exception { + GraphAndSettings graphAndSettings = createGraphAndSettings(false, false); + + AccessExecutionGraph graph = graphAndSettings.graph; + + CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get(); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(json); + + assertEquals("at_least_once", rootNode.get("mode").asText()); + } + + /** + * Tests that the externalized checkpoint settings are forwarded. + */ + @Test + public void testEnabledExternalizedCheckpointSettings() throws Exception { + GraphAndSettings graphAndSettings = createGraphAndSettings(true, false); + + AccessExecutionGraph graph = graphAndSettings.graph; + ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings; + + CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get(); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode externalizedNode = mapper.readTree(json).get("externalization"); + assertNotNull(externalizedNode); + assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean()); + assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean()); + } + + private static GraphAndSettings createGraphAndSettings(boolean externalized, boolean exactlyOnce) { + long interval = 18231823L; + long timeout = 996979L; + long minPause = 119191919L; + int maxConcurrent = 12929329; + ExternalizedCheckpointSettings externalizedSetting = externalized + ? ExternalizedCheckpointSettings.externalizeCheckpoints(true) + : ExternalizedCheckpointSettings.none(); + + JobCheckpointingSettings settings = new JobCheckpointingSettings( + Collections.<JobVertexID>emptyList(), + Collections.<JobVertexID>emptyList(), + Collections.<JobVertexID>emptyList(), + interval, + timeout, + minPause, + maxConcurrent, + externalizedSetting, + null, + exactlyOnce); + + AccessExecutionGraph graph = mock(AccessExecutionGraph.class); + when(graph.getJobCheckpointingSettings()).thenReturn(settings); + + return new GraphAndSettings(graph, settings, externalizedSetting); + } + + private static class GraphAndSettings { + public final AccessExecutionGraph graph; + public final JobCheckpointingSettings snapshottingSettings; + public final ExternalizedCheckpointSettings externalizedSettings; + + public GraphAndSettings( + AccessExecutionGraph graph, + JobCheckpointingSettings snapshottingSettings, + ExternalizedCheckpointSettings externalizedSettings) { + this.graph = graph; + this.snapshottingSettings = snapshottingSettings; + this.externalizedSettings = externalizedSettings; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java new file mode 100644 index 0000000..04b1c55 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the CheckpoitnStatsCache. + */ +public class CheckpointStatsCacheTest { + + @Test + public void testZeroSizeCache() throws Exception { + AbstractCheckpointStats checkpoint = createCheckpoint(0, CheckpointStatsStatus.COMPLETED); + + CheckpointStatsCache cache = new CheckpointStatsCache(0); + cache.tryAdd(checkpoint); + assertNull(cache.tryGet(0L)); + } + + @Test + public void testCacheAddAndGet() throws Exception { + AbstractCheckpointStats chk0 = createCheckpoint(0, CheckpointStatsStatus.COMPLETED); + AbstractCheckpointStats chk1 = createCheckpoint(1, CheckpointStatsStatus.COMPLETED); + AbstractCheckpointStats chk2 = createCheckpoint(2, CheckpointStatsStatus.IN_PROGRESS); + + CheckpointStatsCache cache = new CheckpointStatsCache(1); + cache.tryAdd(chk0); + assertEquals(chk0, cache.tryGet(0)); + + cache.tryAdd(chk1); + assertNull(cache.tryGet(0)); + assertEquals(chk1, cache.tryGet(1)); + + cache.tryAdd(chk2); + assertNull(cache.tryGet(2)); + assertNull(cache.tryGet(0)); + assertEquals(chk1, cache.tryGet(1)); + } + + private AbstractCheckpointStats createCheckpoint(long id, CheckpointStatsStatus status) { + AbstractCheckpointStats checkpoint = mock(AbstractCheckpointStats.class); + when(checkpoint.getCheckpointId()).thenReturn(id); + when(checkpoint.getStatus()).thenReturn(status); + return checkpoint; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java new file mode 100644 index 0000000..e614608 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.checkpoints; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; +import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; +import org.apache.flink.runtime.checkpoint.PendingCheckpointStats; +import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for the CheckpointStatsDetailsHandler. + */ +public class CheckpointStatsDetailsHandlerTest { + + @Test + public void testArchiver() throws IOException { + JsonArchivist archivist = new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist(); + + CompletedCheckpointStats completedCheckpoint = createCompletedCheckpoint(); + FailedCheckpointStats failedCheckpoint = createFailedCheckpoint(); + List<AbstractCheckpointStats> checkpoints = new ArrayList<>(); + checkpoints.add(failedCheckpoint); + checkpoints.add(completedCheckpoint); + + CheckpointStatsHistory history = mock(CheckpointStatsHistory.class); + when(history.getCheckpoints()).thenReturn(checkpoints); + CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class); + when(snapshot.getHistory()).thenReturn(history); + + AccessExecutionGraph graph = mock(AccessExecutionGraph.class); + when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); + when(graph.getJobID()).thenReturn(new JobID()); + + ObjectMapper mapper = new ObjectMapper(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph); + Assert.assertEquals(2, archives.size()); + + Iterator<ArchivedJson> iterator = archives.iterator(); + ArchivedJson archive1 = iterator.next(); + Assert.assertEquals( + "/jobs/" + graph.getJobID() + "/checkpoints/details/" + failedCheckpoint.getCheckpointId(), + archive1.getPath()); + compareFailedCheckpoint(failedCheckpoint, mapper.readTree(archive1.getJson())); + + ArchivedJson archive2 = iterator.next(); + Assert.assertEquals( + "/jobs/" + graph.getJobID() + "/checkpoints/details/" + completedCheckpoint.getCheckpointId(), + archive2.getPath()); + compareCompletedCheckpoint(completedCheckpoint, mapper.readTree(archive2.getJson())); + } + + @Test + public void testGetPaths() { + CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid", paths[0]); + } + + /** + * Tests request with illegal checkpoint ID param. + */ + @Test + public void testIllegalCheckpointId() throws Exception { + AccessExecutionGraph graph = mock(AccessExecutionGraph.class); + CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + Map<String, String> params = new HashMap<>(); + params.put("checkpointid", "illegal checkpoint"); + String json = handler.handleRequest(graph, params).get(); + + assertEquals("{}", json); + } + + /** + * Tests request with missing checkpoint ID param. + */ + @Test + public void testNoCheckpointIdParam() throws Exception { + AccessExecutionGraph graph = mock(AccessExecutionGraph.class); + CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get(); + + assertEquals("{}", json); + } + + /** + * Test lookup of not existing checkpoint in history. + */ + @Test + public void testCheckpointNotFound() throws Exception { + CheckpointStatsHistory history = mock(CheckpointStatsHistory.class); + when(history.getCheckpointById(anyLong())).thenReturn(null); // not found + + CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class); + when(snapshot.getHistory()).thenReturn(history); + + AccessExecutionGraph graph = mock(AccessExecutionGraph.class); + when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); + + CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + Map<String, String> params = new HashMap<>(); + params.put("checkpointid", "123"); + String json = handler.handleRequest(graph, params).get(); + + assertEquals("{}", json); + verify(history, times(1)).getCheckpointById(anyLong()); + } + + /** + * Tests a checkpoint details request for an in progress checkpoint. + */ + @Test + public void testCheckpointDetailsRequestInProgressCheckpoint() throws Exception { + PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class); + when(checkpoint.getCheckpointId()).thenReturn(1992139L); + when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS); + when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint()); + when(checkpoint.getTriggerTimestamp()).thenReturn(1919191900L); + when(checkpoint.getLatestAckTimestamp()).thenReturn(1977791901L); + when(checkpoint.getStateSize()).thenReturn(111939272822L); + when(checkpoint.getEndToEndDuration()).thenReturn(121191L); + when(checkpoint.getAlignmentBuffered()).thenReturn(1L); + when(checkpoint.getNumberOfSubtasks()).thenReturn(501); + when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(101); + + List<TaskStateStats> taskStats = new ArrayList<>(); + TaskStateStats task1 = createTaskStateStats(); + TaskStateStats task2 = createTaskStateStats(); + taskStats.add(task1); + taskStats.add(task2); + + when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats); + + JsonNode rootNode = triggerRequest(checkpoint); + + assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong()); + assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText()); + assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean()); + assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong()); + assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong()); + assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong()); + assertEquals(checkpoint.getEndToEndDuration(), rootNode.get("end_to_end_duration").asLong()); + assertEquals(checkpoint.getAlignmentBuffered(), rootNode.get("alignment_buffered").asLong()); + assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt()); + assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt()); + + verifyTaskNodes(taskStats, rootNode); + } + + /** + * Tests a checkpoint details request for a completed checkpoint. + */ + @Test + public void testCheckpointDetailsRequestCompletedCheckpoint() throws Exception { + CompletedCheckpointStats checkpoint = createCompletedCheckpoint(); + + JsonNode rootNode = triggerRequest(checkpoint); + + compareCompletedCheckpoint(checkpoint, rootNode); + + verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode); + } + + /** + * Tests a checkpoint details request for a failed checkpoint. + */ + @Test + public void testCheckpointDetailsRequestFailedCheckpoint() throws Exception { + FailedCheckpointStats checkpoint = createFailedCheckpoint(); + + JsonNode rootNode = triggerRequest(checkpoint); + + compareFailedCheckpoint(checkpoint, rootNode); + + verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode); + } + + // ------------------------------------------------------------------------ + + private static CompletedCheckpointStats createCompletedCheckpoint() { + CompletedCheckpointStats checkpoint = mock(CompletedCheckpointStats.class); + when(checkpoint.getCheckpointId()).thenReturn(1818213L); + when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED); + when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint()); + when(checkpoint.getTriggerTimestamp()).thenReturn(1818L); + when(checkpoint.getLatestAckTimestamp()).thenReturn(11029222L); + when(checkpoint.getStateSize()).thenReturn(925281L); + when(checkpoint.getEndToEndDuration()).thenReturn(181819L); + when(checkpoint.getAlignmentBuffered()).thenReturn(1010198L); + when(checkpoint.getNumberOfSubtasks()).thenReturn(181271); + when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(29821); + when(checkpoint.isDiscarded()).thenReturn(true); + when(checkpoint.getExternalPath()).thenReturn("checkpoint-external-path"); + + List<TaskStateStats> taskStats = new ArrayList<>(); + TaskStateStats task1 = createTaskStateStats(); + TaskStateStats task2 = createTaskStateStats(); + taskStats.add(task1); + taskStats.add(task2); + + when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats); + + return checkpoint; + } + + private static void compareCompletedCheckpoint(CompletedCheckpointStats checkpoint, JsonNode rootNode) { + assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong()); + assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText()); + assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean()); + assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong()); + assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong()); + assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong()); + assertEquals(checkpoint.getEndToEndDuration(), rootNode.get("end_to_end_duration").asLong()); + assertEquals(checkpoint.getAlignmentBuffered(), rootNode.get("alignment_buffered").asLong()); + assertEquals(checkpoint.isDiscarded(), rootNode.get("discarded").asBoolean()); + assertEquals(checkpoint.getExternalPath(), rootNode.get("external_path").asText()); + assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt()); + assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt()); + } + + private static FailedCheckpointStats createFailedCheckpoint() { + FailedCheckpointStats checkpoint = mock(FailedCheckpointStats.class); + when(checkpoint.getCheckpointId()).thenReturn(1818214L); + when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.FAILED); + when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint()); + when(checkpoint.getTriggerTimestamp()).thenReturn(1818L); + when(checkpoint.getLatestAckTimestamp()).thenReturn(11029222L); + when(checkpoint.getStateSize()).thenReturn(925281L); + when(checkpoint.getEndToEndDuration()).thenReturn(181819L); + when(checkpoint.getAlignmentBuffered()).thenReturn(1010198L); + when(checkpoint.getNumberOfSubtasks()).thenReturn(181271); + when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(29821); + when(checkpoint.getFailureTimestamp()).thenReturn(123012890312093L); + when(checkpoint.getFailureMessage()).thenReturn("failure-message"); + + List<TaskStateStats> taskStats = new ArrayList<>(); + TaskStateStats task1 = createTaskStateStats(); + TaskStateStats task2 = createTaskStateStats(); + taskStats.add(task1); + taskStats.add(task2); + + when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats); + + return checkpoint; + } + + private static void compareFailedCheckpoint(FailedCheckpointStats checkpoint, JsonNode rootNode) { + assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong()); + assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText()); + assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean()); + assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong()); + assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong()); + assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong()); + assertEquals(checkpoint.getEndToEndDuration(), rootNode.get("end_to_end_duration").asLong()); + assertEquals(checkpoint.getAlignmentBuffered(), rootNode.get("alignment_buffered").asLong()); + assertEquals(checkpoint.getFailureTimestamp(), rootNode.get("failure_timestamp").asLong()); + assertEquals(checkpoint.getFailureMessage(), rootNode.get("failure_message").asText()); + assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt()); + assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt()); + } + + private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception { + CheckpointStatsHistory history = mock(CheckpointStatsHistory.class); + when(history.getCheckpointById(anyLong())).thenReturn(checkpoint); + CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class); + when(snapshot.getHistory()).thenReturn(history); + + AccessExecutionGraph graph = mock(AccessExecutionGraph.class); + when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); + + CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + Map<String, String> params = new HashMap<>(); + params.put("checkpointid", "123"); + String json = handler.handleRequest(graph, params).get(); + + ObjectMapper mapper = new ObjectMapper(); + return mapper.readTree(json); + } + + private static void verifyTaskNodes(Collection<TaskStateStats> tasks, JsonNode parentNode) { + for (TaskStateStats task : tasks) { + long duration = ThreadLocalRandom.current().nextInt(128); + + JsonNode taskNode = parentNode.get("tasks").get(task.getJobVertexId().toString()); + assertEquals(task.getLatestAckTimestamp(), taskNode.get("latest_ack_timestamp").asLong()); + assertEquals(task.getStateSize(), taskNode.get("state_size").asLong()); + assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), taskNode.get("end_to_end_duration").asLong()); + assertEquals(task.getAlignmentBuffered(), taskNode.get("alignment_buffered").asLong()); + assertEquals(task.getNumberOfSubtasks(), taskNode.get("num_subtasks").asInt()); + assertEquals(task.getNumberOfAcknowledgedSubtasks(), taskNode.get("num_acknowledged_subtasks").asInt()); + } + } + + private static TaskStateStats createTaskStateStats() { + ThreadLocalRandom rand = ThreadLocalRandom.current(); + + TaskStateStats task = mock(TaskStateStats.class); + when(task.getJobVertexId()).thenReturn(new JobVertexID()); + when(task.getLatestAckTimestamp()).thenReturn(rand.nextLong(1024) + 1); + when(task.getStateSize()).thenReturn(rand.nextLong(1024) + 1); + when(task.getEndToEndDuration(anyLong())).thenReturn(rand.nextLong(1024) + 1); + when(task.getAlignmentBuffered()).thenReturn(rand.nextLong(1024) + 1); + when(task.getNumberOfSubtasks()).thenReturn(rand.nextInt(1024) + 1); + when(task.getNumberOfAcknowledgedSubtasks()).thenReturn(rand.nextInt(1024) + 1); + return task; + } +}
