http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java new file mode 100644 index 0000000..8f21af0 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for the JobCancellationWithSavepointHandler. + */ +public class JobCancellationWithSavepointHandlersTest extends TestLogger { + + private static final Executor executor = Executors.directExecutor(); + + @Test + public void testGetPaths() { + JobCancellationWithSavepointHandlers handler = new JobCancellationWithSavepointHandlers(mock(ExecutionGraphHolder.class), executor); + + JobCancellationWithSavepointHandlers.TriggerHandler triggerHandler = handler.getTriggerHandler(); + String[] triggerPaths = triggerHandler.getPaths(); + Assert.assertEquals(2, triggerPaths.length); + List<String> triggerPathsList = Arrays.asList(triggerPaths); + Assert.assertTrue(triggerPathsList.contains("/jobs/:jobid/cancel-with-savepoint")); + Assert.assertTrue(triggerPathsList.contains("/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory")); + + JobCancellationWithSavepointHandlers.InProgressHandler progressHandler = handler.getInProgressHandler(); + String[] progressPaths = progressHandler.getPaths(); + Assert.assertEquals(1, progressPaths.length); + Assert.assertEquals("/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId", progressPaths[0]); + } + + /** + * Tests that the cancellation ask timeout respects the checkpoint timeout. + * Otherwise, AskTimeoutExceptions are bound to happen for large state. + */ + @Test + public void testAskTimeoutEqualsCheckpointTimeout() throws Exception { + long timeout = 128288238L; + JobID jobId = new JobID(); + ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); + ExecutionGraph graph = mock(ExecutionGraph.class); + CheckpointCoordinator coord = mock(CheckpointCoordinator.class); + when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph))); + when(graph.getCheckpointCoordinator()).thenReturn(coord); + when(coord.getCheckpointTimeout()).thenReturn(timeout); + + JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor); + JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler(); + + Map<String, String> params = new HashMap<>(); + params.put("jobid", jobId.toString()); + params.put("targetDirectory", "placeholder"); + + JobManagerGateway jobManager = mock(JobManagerGateway.class); + when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar")); + + handler.handleRequest(params, Collections.emptyMap(), jobManager); + + verify(jobManager).cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class)); + } + + /** + * Tests that the savepoint directory configuration is respected. + */ + @Test + public void testSavepointDirectoryConfiguration() throws Exception { + long timeout = 128288238L; + JobID jobId = new JobID(); + ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); + ExecutionGraph graph = mock(ExecutionGraph.class); + CheckpointCoordinator coord = mock(CheckpointCoordinator.class); + when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph))); + when(graph.getCheckpointCoordinator()).thenReturn(coord); + when(coord.getCheckpointTimeout()).thenReturn(timeout); + + JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor, "the-default-directory"); + JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler(); + + Map<String, String> params = new HashMap<>(); + params.put("jobid", jobId.toString()); + + JobManagerGateway jobManager = mock(JobManagerGateway.class); + when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar")); + + // 1. Use targetDirectory path param + params.put("targetDirectory", "custom-directory"); + handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); + + verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class)); + + // 2. Use default + params.remove("targetDirectory"); + + handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); + + verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("the-default-directory"), any(Time.class)); + + // 3. Throw Exception + handlers = new JobCancellationWithSavepointHandlers(holder, executor, null); + handler = handlers.getTriggerHandler(); + + try { + handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get(); + fail("Did not throw expected test Exception"); + } catch (Exception e) { + IllegalStateException cause = (IllegalStateException) e.getCause(); + assertEquals(true, cause.getMessage().contains(CoreOptions.SAVEPOINT_DIRECTORY.key())); + } + } + + /** + * Tests triggering a new request and monitoring it. + */ + @Test + public void testTriggerNewRequest() throws Exception { + JobID jobId = new JobID(); + ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); + ExecutionGraph graph = mock(ExecutionGraph.class); + CheckpointCoordinator coord = mock(CheckpointCoordinator.class); + when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph))); + when(graph.getCheckpointCoordinator()).thenReturn(coord); + + JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor); + JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler(); + JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler(); + + Map<String, String> params = new HashMap<>(); + params.put("jobid", jobId.toString()); + params.put("targetDirectory", "custom-directory"); + + JobManagerGateway jobManager = mock(JobManagerGateway.class); + + // Successful + CompletableFuture<String> successfulCancelWithSavepoint = new CompletableFuture<>(); + when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(successfulCancelWithSavepoint); + + // Trigger + FullHttpResponse response = trigger.handleRequest(params, Collections.emptyMap(), jobManager).get(); + + verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class)); + + String location = String.format("/jobs/%s/cancel-with-savepoint/in-progress/1", jobId); + + assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus()); + assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); + assertEquals(location, response.headers().get(HttpHeaders.Names.LOCATION)); + + String json = response.content().toString(Charset.forName("UTF-8")); + JsonNode root = new ObjectMapper().readTree(json); + + assertEquals("accepted", root.get("status").asText()); + assertEquals("1", root.get("request-id").asText()); + assertEquals(location, root.get("location").asText()); + + // Trigger again + response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get(); + assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus()); + assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); + assertEquals(location, response.headers().get(HttpHeaders.Names.LOCATION)); + + json = response.content().toString(Charset.forName("UTF-8")); + root = new ObjectMapper().readTree(json); + + assertEquals("accepted", root.get("status").asText()); + assertEquals("1", root.get("request-id").asText()); + assertEquals(location, root.get("location").asText()); + + // Only single actual request + verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class)); + + // Query progress + params.put("requestId", "1"); + + response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get(); + assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus()); + assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); + + json = response.content().toString(Charset.forName("UTF-8")); + root = new ObjectMapper().readTree(json); + + assertEquals("in-progress", root.get("status").asText()); + assertEquals("1", root.get("request-id").asText()); + + // Complete + successfulCancelWithSavepoint.complete("_path-savepoint_"); + + response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get(); + + assertEquals(HttpResponseStatus.CREATED, response.getStatus()); + assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); + + json = response.content().toString(Charset.forName("UTF-8")); + + root = new ObjectMapper().readTree(json); + + assertEquals("success", root.get("status").asText()); + assertEquals("1", root.get("request-id").asText()); + assertEquals("_path-savepoint_", root.get("savepoint-path").asText()); + + // Query again, keep recent history + + response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get(); + + assertEquals(HttpResponseStatus.CREATED, response.getStatus()); + assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); + + json = response.content().toString(Charset.forName("UTF-8")); + + root = new ObjectMapper().readTree(json); + + assertEquals("success", root.get("status").asText()); + assertEquals("1", root.get("request-id").asText()); + assertEquals("_path-savepoint_", root.get("savepoint-path").asText()); + + // Query for unknown request + params.put("requestId", "9929"); + + response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get(); + assertEquals(HttpResponseStatus.BAD_REQUEST, response.getStatus()); + assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); + + json = response.content().toString(Charset.forName("UTF-8")); + + root = new ObjectMapper().readTree(json); + + assertEquals("failed", root.get("status").asText()); + assertEquals("9929", root.get("request-id").asText()); + assertEquals("Unknown job/request ID", root.get("cause").asText()); + } + + /** + * Tests response when a request fails. + */ + @Test + public void testFailedCancellation() throws Exception { + JobID jobId = new JobID(); + ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); + ExecutionGraph graph = mock(ExecutionGraph.class); + CheckpointCoordinator coord = mock(CheckpointCoordinator.class); + when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph))); + when(graph.getCheckpointCoordinator()).thenReturn(coord); + + JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor); + JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler(); + JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler(); + + Map<String, String> params = new HashMap<>(); + params.put("jobid", jobId.toString()); + params.put("targetDirectory", "custom-directory"); + + JobManagerGateway jobManager = mock(JobManagerGateway.class); + + // Successful + CompletableFuture<String> unsuccessfulCancelWithSavepoint = FutureUtils.completedExceptionally(new Exception("Test Exception")); + when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(unsuccessfulCancelWithSavepoint); + + // Trigger + trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); + verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class)); + + // Query progress + params.put("requestId", "1"); + + FullHttpResponse response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get(); + assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.getStatus()); + assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); + + String json = response.content().toString(Charset.forName("UTF-8")); + JsonNode root = new ObjectMapper().readTree(json); + + assertEquals("failed", root.get("status").asText()); + assertEquals("1", root.get("request-id").asText()); + assertEquals("Test Exception", root.get("cause").asText()); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java new file mode 100644 index 0000000..567df8c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.databind.JsonNode; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the JobConfigHandler. + */ +public class JobConfigHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new JobConfigHandler.JobConfigJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/config", archive.getPath()); + compareJobConfig(originalJob, archive.getJson()); + } + + @Test + public void testGetPaths() { + JobConfigHandler handler = new JobConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/config", paths[0]); + } + + public void testJsonGeneration() throws Exception { + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + String answer = JobConfigHandler.createJobConfigJson(originalJob); + compareJobConfig(originalJob, answer); + } + + private static void compareJobConfig(AccessExecutionGraph originalJob, String answer) throws IOException { + JsonNode job = ArchivedJobGenerationUtils.MAPPER.readTree(answer); + + Assert.assertEquals(originalJob.getJobID().toString(), job.get("jid").asText()); + Assert.assertEquals(originalJob.getJobName(), job.get("name").asText()); + + ArchivedExecutionConfig originalConfig = originalJob.getArchivedExecutionConfig(); + JsonNode config = job.get("execution-config"); + + Assert.assertEquals(originalConfig.getExecutionMode(), config.get("execution-mode").asText()); + Assert.assertEquals(originalConfig.getRestartStrategyDescription(), config.get("restart-strategy").asText()); + Assert.assertEquals(originalConfig.getParallelism(), config.get("job-parallelism").asInt()); + Assert.assertEquals(originalConfig.getObjectReuseEnabled(), config.get("object-reuse-mode").asBoolean()); + + Map<String, String> originalUserConfig = originalConfig.getGlobalJobParameters(); + JsonNode userConfig = config.get("user-config"); + + for (Map.Entry<String, String> originalEntry : originalUserConfig.entrySet()) { + Assert.assertEquals(originalEntry.getValue(), userConfig.get(originalEntry.getKey()).asText()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java new file mode 100644 index 0000000..afd743e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the JobDetailsHandler. + */ +public class JobDetailsHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new JobDetailsHandler.JobDetailsJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(2, archives.size()); + + Iterator<ArchivedJson> iterator = archives.iterator(); + ArchivedJson archive1 = iterator.next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID(), archive1.getPath()); + compareJobDetails(originalJob, archive1.getJson()); + + ArchivedJson archive2 = iterator.next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices", archive2.getPath()); + compareJobDetails(originalJob, archive2.getJson()); + } + + @Test + public void testGetPaths() { + JobDetailsHandler handler = new JobDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); + String[] paths = handler.getPaths(); + Assert.assertEquals(2, paths.length); + List<String> pathsList = Lists.newArrayList(paths); + Assert.assertTrue(pathsList.contains("/jobs/:jobid")); + Assert.assertTrue(pathsList.contains("/jobs/:jobid/vertices")); + } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + String json = JobDetailsHandler.createJobDetailsJson(originalJob, null); + + compareJobDetails(originalJob, json); + } + + private static void compareJobDetails(AccessExecutionGraph originalJob, String json) throws IOException { + JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); + + Assert.assertEquals(originalJob.getJobID().toString(), result.get("jid").asText()); + Assert.assertEquals(originalJob.getJobName(), result.get("name").asText()); + Assert.assertEquals(originalJob.isStoppable(), result.get("isStoppable").asBoolean()); + Assert.assertEquals(originalJob.getState().name(), result.get("state").asText()); + + Assert.assertEquals(originalJob.getStatusTimestamp(JobStatus.CREATED), result.get("start-time").asLong()); + Assert.assertEquals(originalJob.getStatusTimestamp(originalJob.getState()), result.get("end-time").asLong()); + Assert.assertEquals( + originalJob.getStatusTimestamp(originalJob.getState()) - originalJob.getStatusTimestamp(JobStatus.CREATED), + result.get("duration").asLong() + ); + + JsonNode timestamps = result.get("timestamps"); + for (JobStatus status : JobStatus.values()) { + Assert.assertEquals(originalJob.getStatusTimestamp(status), timestamps.get(status.name()).asLong()); + } + + ArrayNode tasks = (ArrayNode) result.get("vertices"); + int x = 0; + for (AccessExecutionJobVertex expectedTask : originalJob.getVerticesTopologically()) { + JsonNode task = tasks.get(x); + + Assert.assertEquals(expectedTask.getJobVertexId().toString(), task.get("id").asText()); + Assert.assertEquals(expectedTask.getName(), task.get("name").asText()); + Assert.assertEquals(expectedTask.getParallelism(), task.get("parallelism").asInt()); + Assert.assertEquals(expectedTask.getAggregateState().name(), task.get("status").asText()); + + Assert.assertEquals(3, task.get("start-time").asLong()); + Assert.assertEquals(5, task.get("end-time").asLong()); + Assert.assertEquals(2, task.get("duration").asLong()); + + JsonNode subtasksPerState = task.get("tasks"); + Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CREATED.name()).asInt()); + Assert.assertEquals(0, subtasksPerState.get(ExecutionState.SCHEDULED.name()).asInt()); + Assert.assertEquals(0, subtasksPerState.get(ExecutionState.DEPLOYING.name()).asInt()); + Assert.assertEquals(0, subtasksPerState.get(ExecutionState.RUNNING.name()).asInt()); + Assert.assertEquals(1, subtasksPerState.get(ExecutionState.FINISHED.name()).asInt()); + Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CANCELING.name()).asInt()); + Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CANCELED.name()).asInt()); + Assert.assertEquals(0, subtasksPerState.get(ExecutionState.FAILED.name()).asInt()); + + long expectedNumBytesIn = 0; + long expectedNumBytesOut = 0; + long expectedNumRecordsIn = 0; + long expectedNumRecordsOut = 0; + + for (AccessExecutionVertex vertex : expectedTask.getTaskVertices()) { + IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics(); + + expectedNumBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote(); + expectedNumBytesOut += ioMetrics.getNumBytesOut(); + expectedNumRecordsIn += ioMetrics.getNumRecordsIn(); + expectedNumRecordsOut += ioMetrics.getNumRecordsOut(); + } + + JsonNode metrics = task.get("metrics"); + + Assert.assertEquals(expectedNumBytesIn, metrics.get("read-bytes").asLong()); + Assert.assertEquals(expectedNumBytesOut, metrics.get("write-bytes").asLong()); + Assert.assertEquals(expectedNumRecordsIn, metrics.get("read-records").asLong()); + Assert.assertEquals(expectedNumRecordsOut, metrics.get("write-records").asLong()); + + x++; + } + Assert.assertEquals(1, tasks.size()); + + JsonNode statusCounts = result.get("status-counts"); + Assert.assertEquals(0, statusCounts.get(ExecutionState.CREATED.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.SCHEDULED.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.DEPLOYING.name()).asInt()); + Assert.assertEquals(1, statusCounts.get(ExecutionState.RUNNING.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.FINISHED.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELING.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELED.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.FAILED.name()).asInt()); + + Assert.assertEquals(ArchivedJobGenerationUtils.MAPPER.readTree(originalJob.getJsonPlan()), result.get("plan")); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java new file mode 100644 index 0000000..6a20696 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.ExceptionUtils; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the JobExceptionsHandler. + */ +public class JobExceptionsHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new JobExceptionsHandler.JobExceptionsJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/exceptions", archive.getPath()); + compareExceptions(originalJob, archive.getJson()); + } + + @Test + public void testGetPaths() { + JobExceptionsHandler handler = new JobExceptionsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]); + } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + String json = JobExceptionsHandler.createJobExceptionsJson(originalJob); + + compareExceptions(originalJob, json); + } + + private static void compareExceptions(AccessExecutionGraph originalJob, String json) throws IOException { + JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); + + Assert.assertEquals(originalJob.getFailureCause().getExceptionAsString(), result.get("root-exception").asText()); + Assert.assertEquals(originalJob.getFailureCause().getTimestamp(), result.get("timestamp").asLong()); + + ArrayNode exceptions = (ArrayNode) result.get("all-exceptions"); + + int x = 0; + for (AccessExecutionVertex expectedSubtask : originalJob.getAllExecutionVertices()) { + if (!expectedSubtask.getFailureCauseAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { + JsonNode exception = exceptions.get(x); + + Assert.assertEquals(expectedSubtask.getFailureCauseAsString(), exception.get("exception").asText()); + Assert.assertEquals(expectedSubtask.getStateTimestamp(ExecutionState.FAILED), exception.get("timestamp").asLong()); + Assert.assertEquals(expectedSubtask.getTaskNameWithSubtaskIndex(), exception.get("task").asText()); + + TaskManagerLocation location = expectedSubtask.getCurrentAssignedResourceLocation(); + String expectedLocationString = location.getFQDNHostname() + ':' + location.dataPort(); + Assert.assertEquals(expectedLocationString, exception.get("location").asText()); + } + x++; + } + Assert.assertEquals(x > JobExceptionsHandler.MAX_NUMBER_EXCEPTION_TO_REPORT, result.get("truncated").asBoolean()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java new file mode 100644 index 0000000..03ddb73 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.Executors; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for the JobManagerConfigHandler. + */ +public class JobManagerConfigHandlerTest { + @Test + public void testGetPaths() { + JobManagerConfigHandler handler = new JobManagerConfigHandler(Executors.directExecutor(), null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobmanager/config", paths[0]); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java new file mode 100644 index 0000000..6d3b213 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collection; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the JobPlanHandler. + */ +public class JobPlanHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new JobPlanHandler.JobPlanJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/plan", archive.getPath()); + Assert.assertEquals(originalJob.getJsonPlan(), archive.getJson()); + } + + @Test + public void testGetPaths() { + JobPlanHandler handler = new JobPlanHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/plan", paths[0]); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandlerTest.java new file mode 100644 index 0000000..2c39fcf --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandlerTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +/** + * Tests for the JobStoppingHandler. + */ +public class JobStoppingHandlerTest extends TestLogger { + @Test + public void testGetPaths() { + JobStoppingHandler handler = new JobStoppingHandler(Executors.directExecutor(), TestingUtils.TIMEOUT()); + String[] paths = handler.getPaths(); + Assert.assertEquals(2, paths.length); + List<String> pathsList = Lists.newArrayList(paths); + Assert.assertTrue(pathsList.contains("/jobs/:jobid/stop")); + Assert.assertTrue(pathsList.contains("/jobs/:jobid/yarn-stop")); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java new file mode 100644 index 0000000..feffe60 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the JobVertexAccumulatorsHandler. + */ +public class JobVertexAccumulatorsHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/accumulators", archive.getPath()); + compareAccumulators(originalTask, archive.getJson()); + } + + @Test + public void testGetPaths() { + JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]); + } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + String json = JobVertexAccumulatorsHandler.createVertexAccumulatorsJson(originalTask); + + compareAccumulators(originalTask, json); + } + + private static void compareAccumulators(AccessExecutionJobVertex originalTask, String json) throws IOException { + JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); + + Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); + + ArrayNode accs = (ArrayNode) result.get("user-accumulators"); + StringifiedAccumulatorResult[] expectedAccs = originalTask.getAggregatedUserAccumulatorsStringified(); + + ArchivedJobGenerationUtils.compareStringifiedAccumulators(expectedAccs, accs); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java new file mode 100644 index 0000000..bd6817f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for back pressure handler responses. + */ +public class JobVertexBackPressureHandlerTest { + @Test + public void testGetPaths() { + JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), mock(BackPressureStatsTracker.class), 0); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/backpressure", paths[0]); + } + + /** Tests the response when no stats are available. */ + @Test + public void testResponseNoStatsAvailable() throws Exception { + ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class); + BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class); + + when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class))) + .thenReturn(Optional.empty()); + + JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler( + mock(ExecutionGraphHolder.class), + Executors.directExecutor(), + statsTracker, + 9999); + + String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get(); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(response); + + // Single element + assertEquals(1, rootNode.size()); + + // Status + JsonNode status = rootNode.get("status"); + assertNotNull(status); + assertEquals("deprecated", status.textValue()); + + verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class)); + } + + /** Tests the response when stats are available. */ + @Test + public void testResponseStatsAvailable() throws Exception { + ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class); + BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class); + + OperatorBackPressureStats stats = new OperatorBackPressureStats( + 0, System.currentTimeMillis(), new double[] { 0.31, 0.48, 1.0, 0.0 }); + + when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class))) + .thenReturn(Optional.of(stats)); + + JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler( + mock(ExecutionGraphHolder.class), + Executors.directExecutor(), + statsTracker, + 9999); + + String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get(); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(response); + + // Single element + assertEquals(4, rootNode.size()); + + // Status + JsonNode status = rootNode.get("status"); + assertNotNull(status); + assertEquals("ok", status.textValue()); + + // Back pressure level + JsonNode backPressureLevel = rootNode.get("backpressure-level"); + assertNotNull(backPressureLevel); + assertEquals("high", backPressureLevel.textValue()); + + // End time stamp + JsonNode endTimeStamp = rootNode.get("end-timestamp"); + assertNotNull(endTimeStamp); + assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue()); + + // Subtasks + JsonNode subTasks = rootNode.get("subtasks"); + assertEquals(stats.getNumberOfSubTasks(), subTasks.size()); + for (int i = 0; i < subTasks.size(); i++) { + JsonNode subTask = subTasks.get(i); + + JsonNode index = subTask.get("subtask"); + assertEquals(i, index.intValue()); + + JsonNode level = subTask.get("backpressure-level"); + assertEquals(JobVertexBackPressureHandler + .getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue()); + + JsonNode ratio = subTask.get("ratio"); + assertEquals(stats.getBackPressureRatio(i), ratio.doubleValue(), 0.0); + } + + // Verify not triggered + verify(statsTracker, never()).triggerStackTraceSample(any(ExecutionJobVertex.class)); + } + + /** Tests that after the refresh interval another sample is triggered. */ + @Test + public void testResponsePassedRefreshInterval() throws Exception { + ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class); + BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class); + + OperatorBackPressureStats stats = new OperatorBackPressureStats( + 0, System.currentTimeMillis(), new double[] { 0.31, 0.48, 1.0, 0.0 }); + + when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class))) + .thenReturn(Optional.of(stats)); + + JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler( + mock(ExecutionGraphHolder.class), + Executors.directExecutor(), + statsTracker, + 0); // <----- refresh interval should fire immediately + + String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get(); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(response); + + // Single element + assertEquals(4, rootNode.size()); + + // Status + JsonNode status = rootNode.get("status"); + assertNotNull(status); + // Interval passed, hence deprecated + assertEquals("deprecated", status.textValue()); + + // Back pressure level + JsonNode backPressureLevel = rootNode.get("backpressure-level"); + assertNotNull(backPressureLevel); + assertEquals("high", backPressureLevel.textValue()); + + // End time stamp + JsonNode endTimeStamp = rootNode.get("end-timestamp"); + assertNotNull(endTimeStamp); + assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue()); + + // Subtasks + JsonNode subTasks = rootNode.get("subtasks"); + assertEquals(stats.getNumberOfSubTasks(), subTasks.size()); + for (int i = 0; i < subTasks.size(); i++) { + JsonNode subTask = subTasks.get(i); + + JsonNode index = subTask.get("subtask"); + assertEquals(i, index.intValue()); + + JsonNode level = subTask.get("backpressure-level"); + assertEquals(JobVertexBackPressureHandler + .getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue()); + + JsonNode ratio = subTask.get("ratio"); + assertEquals(stats.getBackPressureRatio(i), ratio.doubleValue(), 0.0); + } + + // Verify triggered + verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java new file mode 100644 index 0000000..5af1d53 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the JobVertexDetailsHandler. + */ +public class JobVertexDetailsHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new JobVertexDetailsHandler.JobVertexDetailsJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId(), archive.getPath()); + compareVertexDetails(originalTask, archive.getJson()); + } + + @Test + public void testGetPaths() { + JobVertexDetailsHandler handler = new JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", paths[0]); + } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + String json = JobVertexDetailsHandler.createVertexDetailsJson( + originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null); + + compareVertexDetails(originalTask, json); + } + + private static void compareVertexDetails(AccessExecutionJobVertex originalTask, String json) throws IOException { + JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); + + Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); + Assert.assertEquals(originalTask.getName(), result.get("name").asText()); + Assert.assertEquals(originalTask.getParallelism(), result.get("parallelism").asInt()); + Assert.assertTrue(result.get("now").asLong() > 0); + + ArrayNode subtasks = (ArrayNode) result.get("subtasks"); + + Assert.assertEquals(originalTask.getTaskVertices().length, subtasks.size()); + for (int x = 0; x < originalTask.getTaskVertices().length; x++) { + AccessExecutionVertex expectedSubtask = originalTask.getTaskVertices()[x]; + JsonNode subtask = subtasks.get(x); + + Assert.assertEquals(x, subtask.get("subtask").asInt()); + Assert.assertEquals(expectedSubtask.getExecutionState().name(), subtask.get("status").asText()); + Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(), subtask.get("attempt").asInt()); + + TaskManagerLocation location = expectedSubtask.getCurrentAssignedResourceLocation(); + String expectedLocationString = location.getHostname() + ":" + location.dataPort(); + Assert.assertEquals(expectedLocationString, subtask.get("host").asText()); + long start = expectedSubtask.getStateTimestamp(ExecutionState.DEPLOYING); + Assert.assertEquals(start, subtask.get("start-time").asLong()); + long end = expectedSubtask.getStateTimestamp(ExecutionState.FINISHED); + Assert.assertEquals(end, subtask.get("end-time").asLong()); + Assert.assertEquals(end - start, subtask.get("duration").asLong()); + + ArchivedJobGenerationUtils.compareIoMetrics(expectedSubtask.getCurrentExecutionAttempt().getIOMetrics(), subtask.get("metrics")); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java new file mode 100644 index 0000000..2a027fd --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the JobVertexTaskManagersHandler. + */ +public class JobVertexTaskManagersHandlerTest extends TestLogger { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + AccessExecutionVertex originalSubtask = ArchivedJobGenerationUtils.getTestSubtask(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/taskmanagers", archive.getPath()); + compareVertexTaskManagers(originalTask, originalSubtask, archive.getJson()); + } + + @Test + public void testGetPaths() { + JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]); + } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + AccessExecutionVertex originalSubtask = ArchivedJobGenerationUtils.getTestSubtask(); + String json = JobVertexTaskManagersHandler.createVertexDetailsByTaskManagerJson( + originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null); + + compareVertexTaskManagers(originalTask, originalSubtask, json); + } + + private static void compareVertexTaskManagers(AccessExecutionJobVertex originalTask, AccessExecutionVertex originalSubtask, String json) throws IOException { + JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); + + Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); + Assert.assertEquals(originalTask.getName(), result.get("name").asText()); + Assert.assertTrue(result.get("now").asLong() > 0); + + ArrayNode taskmanagers = (ArrayNode) result.get("taskmanagers"); + + JsonNode taskManager = taskmanagers.get(0); + + TaskManagerLocation location = originalSubtask.getCurrentAssignedResourceLocation(); + String expectedLocationString = location.getHostname() + ':' + location.dataPort(); + Assert.assertEquals(expectedLocationString, taskManager.get("host").asText()); + Assert.assertEquals(ExecutionState.FINISHED.name(), taskManager.get("status").asText()); + + Assert.assertEquals(3, taskManager.get("start-time").asLong()); + Assert.assertEquals(5, taskManager.get("end-time").asLong()); + Assert.assertEquals(2, taskManager.get("duration").asLong()); + + JsonNode statusCounts = taskManager.get("status-counts"); + Assert.assertEquals(0, statusCounts.get(ExecutionState.CREATED.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.SCHEDULED.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.DEPLOYING.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.RUNNING.name()).asInt()); + Assert.assertEquals(1, statusCounts.get(ExecutionState.FINISHED.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELING.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELED.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.FAILED.name()).asInt()); + + long expectedNumBytesIn = 0; + long expectedNumBytesOut = 0; + long expectedNumRecordsIn = 0; + long expectedNumRecordsOut = 0; + + for (AccessExecutionVertex vertex : originalTask.getTaskVertices()) { + IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics(); + + expectedNumBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote(); + expectedNumBytesOut += ioMetrics.getNumBytesOut(); + expectedNumRecordsIn += ioMetrics.getNumRecordsIn(); + expectedNumRecordsOut += ioMetrics.getNumRecordsOut(); + } + + JsonNode metrics = taskManager.get("metrics"); + + Assert.assertEquals(expectedNumBytesIn, metrics.get("read-bytes").asLong()); + Assert.assertEquals(expectedNumBytesOut, metrics.get("write-bytes").asLong()); + Assert.assertEquals(expectedNumRecordsIn, metrics.get("read-records").asLong()); + Assert.assertEquals(expectedNumRecordsOut, metrics.get("write-records").asLong()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java new file mode 100644 index 0000000..9e0d549 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the SubtaskCurrentAttemptDetailsHandler. + */ +public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger { + @Test + public void testGetPaths() { + SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", paths[0]); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java new file mode 100644 index 0000000..49e54c0 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the SubtaskExecutionAttemptAccumulatorsHandler. + */ +public class SubtaskExecutionAttemptAccumulatorsHandlerTest extends TestLogger { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals( + "/jobs/" + originalJob.getJobID() + + "/vertices/" + originalTask.getJobVertexId() + + "/subtasks/" + originalAttempt.getParallelSubtaskIndex() + + "/attempts/" + originalAttempt.getAttemptNumber() + + "/accumulators", + archive.getPath()); + compareAttemptAccumulators(originalAttempt, archive.getJson()); + } + + @Test + public void testGetPaths() { + SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", paths[0]); + } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); + String json = SubtaskExecutionAttemptAccumulatorsHandler.createAttemptAccumulatorsJson(originalAttempt); + + compareAttemptAccumulators(originalAttempt, json); + } + + private static void compareAttemptAccumulators(AccessExecution originalAttempt, String json) throws IOException { + JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); + + Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt()); + Assert.assertEquals(originalAttempt.getAttemptNumber(), result.get("attempt").asInt()); + Assert.assertEquals(originalAttempt.getAttemptId().toString(), result.get("id").asText()); + + ArchivedJobGenerationUtils.compareStringifiedAccumulators(originalAttempt.getUserAccumulatorsStringified(), (ArrayNode) result.get("user-accumulators")); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java new file mode 100644 index 0000000..e1fe8b5 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; + +import com.fasterxml.jackson.databind.JsonNode; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the SubtaskExecutionAttemptDetailsHandler. + */ +public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(2, archives.size()); + + Iterator<ArchivedJson> iterator = archives.iterator(); + ArchivedJson archive1 = iterator.next(); + Assert.assertEquals( + "/jobs/" + originalJob.getJobID() + + "/vertices/" + originalTask.getJobVertexId() + + "/subtasks/" + originalAttempt.getParallelSubtaskIndex(), + archive1.getPath()); + compareAttemptDetails(originalAttempt, archive1.getJson()); + + ArchivedJson archive2 = iterator.next(); + Assert.assertEquals( + "/jobs/" + originalJob.getJobID() + + "/vertices/" + originalTask.getJobVertexId() + + "/subtasks/" + originalAttempt.getParallelSubtaskIndex() + + "/attempts/" + originalAttempt.getAttemptNumber(), + archive2.getPath()); + compareAttemptDetails(originalAttempt, archive2.getJson()); + } + + @Test + public void testGetPaths() { + SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", paths[0]); + } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); + String json = SubtaskExecutionAttemptDetailsHandler.createAttemptDetailsJson( + originalAttempt, originalJob.getJobID().toString(), originalTask.getJobVertexId().toString(), null); + + compareAttemptDetails(originalAttempt, json); + } + + private static void compareAttemptDetails(AccessExecution originalAttempt, String json) throws IOException { + JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); + + Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt()); + Assert.assertEquals(originalAttempt.getState().name(), result.get("status").asText()); + Assert.assertEquals(originalAttempt.getAttemptNumber(), result.get("attempt").asInt()); + Assert.assertEquals(originalAttempt.getAssignedResourceLocation().getHostname(), result.get("host").asText()); + long start = originalAttempt.getStateTimestamp(ExecutionState.DEPLOYING); + Assert.assertEquals(start, result.get("start-time").asLong()); + long end = originalAttempt.getStateTimestamp(ExecutionState.FINISHED); + Assert.assertEquals(end, result.get("end-time").asLong()); + Assert.assertEquals(end - start, result.get("duration").asLong()); + + ArchivedJobGenerationUtils.compareIoMetrics(originalAttempt.getIOMetrics(), result.get("metrics")); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java new file mode 100644 index 0000000..1478f00 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.TestLogger; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the SubtasksAllAccumulatorsHandler. + */ +public class SubtasksAllAccumulatorsHandlerTest extends TestLogger { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + + "/subtasks/accumulators", archive.getPath()); + compareSubtaskAccumulators(originalTask, archive.getJson()); + } + + @Test + public void testGetPaths() { + SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", paths[0]); + } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + String json = SubtasksAllAccumulatorsHandler.createSubtasksAccumulatorsJson(originalTask); + compareSubtaskAccumulators(originalTask, json); + } + + private static void compareSubtaskAccumulators(AccessExecutionJobVertex originalTask, String json) throws IOException { + JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); + + Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); + Assert.assertEquals(originalTask.getParallelism(), result.get("parallelism").asInt()); + + ArrayNode subtasks = (ArrayNode) result.get("subtasks"); + + Assert.assertEquals(originalTask.getTaskVertices().length, subtasks.size()); + for (int x = 0; x < originalTask.getTaskVertices().length; x++) { + JsonNode subtask = subtasks.get(x); + AccessExecutionVertex expectedSubtask = originalTask.getTaskVertices()[x]; + + Assert.assertEquals(x, subtask.get("subtask").asInt()); + Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(), subtask.get("attempt").asInt()); + Assert.assertEquals(expectedSubtask.getCurrentAssignedResourceLocation().getHostname(), subtask.get("host").asText()); + + ArchivedJobGenerationUtils.compareStringifiedAccumulators( + expectedSubtask.getCurrentExecutionAttempt().getUserAccumulatorsStringified(), + (ArrayNode) subtask.get("user-accumulators")); + } + } +}
