http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java deleted file mode 100644 index e34631e..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java +++ /dev/null @@ -1,334 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.util.TestLogger; - -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.Assert; -import org.junit.Test; - -import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -/** - * Tests for the JobCancellationWithSavepointHandler. - */ -public class JobCancellationWithSavepointHandlersTest extends TestLogger { - - private static final Executor executor = Executors.directExecutor(); - - @Test - public void testGetPaths() { - JobCancellationWithSavepointHandlers handler = new JobCancellationWithSavepointHandlers(mock(ExecutionGraphHolder.class), executor); - - JobCancellationWithSavepointHandlers.TriggerHandler triggerHandler = handler.getTriggerHandler(); - String[] triggerPaths = triggerHandler.getPaths(); - Assert.assertEquals(2, triggerPaths.length); - List<String> triggerPathsList = Arrays.asList(triggerPaths); - Assert.assertTrue(triggerPathsList.contains("/jobs/:jobid/cancel-with-savepoint")); - Assert.assertTrue(triggerPathsList.contains("/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory")); - - JobCancellationWithSavepointHandlers.InProgressHandler progressHandler = handler.getInProgressHandler(); - String[] progressPaths = progressHandler.getPaths(); - Assert.assertEquals(1, progressPaths.length); - Assert.assertEquals("/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId", progressPaths[0]); - } - - /** - * Tests that the cancellation ask timeout respects the checkpoint timeout. - * Otherwise, AskTimeoutExceptions are bound to happen for large state. - */ - @Test - public void testAskTimeoutEqualsCheckpointTimeout() throws Exception { - long timeout = 128288238L; - JobID jobId = new JobID(); - ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); - ExecutionGraph graph = mock(ExecutionGraph.class); - CheckpointCoordinator coord = mock(CheckpointCoordinator.class); - when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph))); - when(graph.getCheckpointCoordinator()).thenReturn(coord); - when(coord.getCheckpointTimeout()).thenReturn(timeout); - - JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor); - JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler(); - - Map<String, String> params = new HashMap<>(); - params.put("jobid", jobId.toString()); - params.put("targetDirectory", "placeholder"); - - JobManagerGateway jobManager = mock(JobManagerGateway.class); - when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar")); - - handler.handleRequest(params, Collections.emptyMap(), jobManager); - - verify(jobManager).cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class)); - } - - /** - * Tests that the savepoint directory configuration is respected. - */ - @Test - public void testSavepointDirectoryConfiguration() throws Exception { - long timeout = 128288238L; - JobID jobId = new JobID(); - ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); - ExecutionGraph graph = mock(ExecutionGraph.class); - CheckpointCoordinator coord = mock(CheckpointCoordinator.class); - when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph))); - when(graph.getCheckpointCoordinator()).thenReturn(coord); - when(coord.getCheckpointTimeout()).thenReturn(timeout); - - JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor, "the-default-directory"); - JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler(); - - Map<String, String> params = new HashMap<>(); - params.put("jobid", jobId.toString()); - - JobManagerGateway jobManager = mock(JobManagerGateway.class); - when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar")); - - // 1. Use targetDirectory path param - params.put("targetDirectory", "custom-directory"); - handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); - - verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class)); - - // 2. Use default - params.remove("targetDirectory"); - - handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); - - verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("the-default-directory"), any(Time.class)); - - // 3. Throw Exception - handlers = new JobCancellationWithSavepointHandlers(holder, executor, null); - handler = handlers.getTriggerHandler(); - - try { - handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get(); - fail("Did not throw expected test Exception"); - } catch (Exception e) { - IllegalStateException cause = (IllegalStateException) e.getCause(); - assertEquals(true, cause.getMessage().contains(CoreOptions.SAVEPOINT_DIRECTORY.key())); - } - } - - /** - * Tests triggering a new request and monitoring it. - */ - @Test - public void testTriggerNewRequest() throws Exception { - JobID jobId = new JobID(); - ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); - ExecutionGraph graph = mock(ExecutionGraph.class); - CheckpointCoordinator coord = mock(CheckpointCoordinator.class); - when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph))); - when(graph.getCheckpointCoordinator()).thenReturn(coord); - - JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor); - JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler(); - JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler(); - - Map<String, String> params = new HashMap<>(); - params.put("jobid", jobId.toString()); - params.put("targetDirectory", "custom-directory"); - - JobManagerGateway jobManager = mock(JobManagerGateway.class); - - // Successful - CompletableFuture<String> successfulCancelWithSavepoint = new CompletableFuture<>(); - when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(successfulCancelWithSavepoint); - - // Trigger - FullHttpResponse response = trigger.handleRequest(params, Collections.emptyMap(), jobManager).get(); - - verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class)); - - String location = String.format("/jobs/%s/cancel-with-savepoint/in-progress/1", jobId); - - assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus()); - assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); - assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); - assertEquals(location, response.headers().get(HttpHeaders.Names.LOCATION)); - - String json = response.content().toString(Charset.forName("UTF-8")); - JsonNode root = new ObjectMapper().readTree(json); - - assertEquals("accepted", root.get("status").asText()); - assertEquals("1", root.get("request-id").asText()); - assertEquals(location, root.get("location").asText()); - - // Trigger again - response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get(); - assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus()); - assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); - assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); - assertEquals(location, response.headers().get(HttpHeaders.Names.LOCATION)); - - json = response.content().toString(Charset.forName("UTF-8")); - root = new ObjectMapper().readTree(json); - - assertEquals("accepted", root.get("status").asText()); - assertEquals("1", root.get("request-id").asText()); - assertEquals(location, root.get("location").asText()); - - // Only single actual request - verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class)); - - // Query progress - params.put("requestId", "1"); - - response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get(); - assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus()); - assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); - assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); - - json = response.content().toString(Charset.forName("UTF-8")); - root = new ObjectMapper().readTree(json); - - assertEquals("in-progress", root.get("status").asText()); - assertEquals("1", root.get("request-id").asText()); - - // Complete - successfulCancelWithSavepoint.complete("_path-savepoint_"); - - response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get(); - - assertEquals(HttpResponseStatus.CREATED, response.getStatus()); - assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); - assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); - - json = response.content().toString(Charset.forName("UTF-8")); - - root = new ObjectMapper().readTree(json); - - assertEquals("success", root.get("status").asText()); - assertEquals("1", root.get("request-id").asText()); - assertEquals("_path-savepoint_", root.get("savepoint-path").asText()); - - // Query again, keep recent history - - response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get(); - - assertEquals(HttpResponseStatus.CREATED, response.getStatus()); - assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); - assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); - - json = response.content().toString(Charset.forName("UTF-8")); - - root = new ObjectMapper().readTree(json); - - assertEquals("success", root.get("status").asText()); - assertEquals("1", root.get("request-id").asText()); - assertEquals("_path-savepoint_", root.get("savepoint-path").asText()); - - // Query for unknown request - params.put("requestId", "9929"); - - response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get(); - assertEquals(HttpResponseStatus.BAD_REQUEST, response.getStatus()); - assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); - assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); - - json = response.content().toString(Charset.forName("UTF-8")); - - root = new ObjectMapper().readTree(json); - - assertEquals("failed", root.get("status").asText()); - assertEquals("9929", root.get("request-id").asText()); - assertEquals("Unknown job/request ID", root.get("cause").asText()); - } - - /** - * Tests response when a request fails. - */ - @Test - public void testFailedCancellation() throws Exception { - JobID jobId = new JobID(); - ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); - ExecutionGraph graph = mock(ExecutionGraph.class); - CheckpointCoordinator coord = mock(CheckpointCoordinator.class); - when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph))); - when(graph.getCheckpointCoordinator()).thenReturn(coord); - - JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor); - JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler(); - JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler(); - - Map<String, String> params = new HashMap<>(); - params.put("jobid", jobId.toString()); - params.put("targetDirectory", "custom-directory"); - - JobManagerGateway jobManager = mock(JobManagerGateway.class); - - // Successful - CompletableFuture<String> unsuccessfulCancelWithSavepoint = FutureUtils.completedExceptionally(new Exception("Test Exception")); - when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(unsuccessfulCancelWithSavepoint); - - // Trigger - trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); - verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class)); - - // Query progress - params.put("requestId", "1"); - - FullHttpResponse response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get(); - assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.getStatus()); - assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); - assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); - - String json = response.content().toString(Charset.forName("UTF-8")); - JsonNode root = new ObjectMapper().readTree(json); - - assertEquals("failed", root.get("status").asText()); - assertEquals("1", root.get("request-id").asText()); - assertEquals("Test Exception", root.get("cause").asText()); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java deleted file mode 100644 index 1c08ae8..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.api.common.ArchivedExecutionConfig; -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; - -import com.fasterxml.jackson.databind.JsonNode; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collection; -import java.util.Map; - -import static org.mockito.Mockito.mock; - -/** - * Tests for the JobConfigHandler. - */ -public class JobConfigHandlerTest { - - @Test - public void testArchiver() throws Exception { - JsonArchivist archivist = new JobConfigHandler.JobConfigJsonArchivist(); - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - - Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); - Assert.assertEquals(1, archives.size()); - - ArchivedJson archive = archives.iterator().next(); - Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/config", archive.getPath()); - compareJobConfig(originalJob, archive.getJson()); - } - - @Test - public void testGetPaths() { - JobConfigHandler handler = new JobConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/config", paths[0]); - } - - public void testJsonGeneration() throws Exception { - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - String answer = JobConfigHandler.createJobConfigJson(originalJob); - compareJobConfig(originalJob, answer); - } - - private static void compareJobConfig(AccessExecutionGraph originalJob, String answer) throws IOException { - JsonNode job = ArchivedJobGenerationUtils.MAPPER.readTree(answer); - - Assert.assertEquals(originalJob.getJobID().toString(), job.get("jid").asText()); - Assert.assertEquals(originalJob.getJobName(), job.get("name").asText()); - - ArchivedExecutionConfig originalConfig = originalJob.getArchivedExecutionConfig(); - JsonNode config = job.get("execution-config"); - - Assert.assertEquals(originalConfig.getExecutionMode(), config.get("execution-mode").asText()); - Assert.assertEquals(originalConfig.getRestartStrategyDescription(), config.get("restart-strategy").asText()); - Assert.assertEquals(originalConfig.getParallelism(), config.get("job-parallelism").asInt()); - Assert.assertEquals(originalConfig.getObjectReuseEnabled(), config.get("object-reuse-mode").asBoolean()); - - Map<String, String> originalUserConfig = originalConfig.getGlobalJobParameters(); - JsonNode userConfig = config.get("user-config"); - - for (Map.Entry<String, String> originalEntry : originalUserConfig.entrySet()) { - Assert.assertEquals(originalEntry.getValue(), userConfig.get(originalEntry.getKey()).asText()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java deleted file mode 100644 index ee0498e..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.executiongraph.IOMetrics; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; - -import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; - -import static org.mockito.Mockito.mock; - -/** - * Tests for the JobDetailsHandler. - */ -public class JobDetailsHandlerTest { - - @Test - public void testArchiver() throws Exception { - JsonArchivist archivist = new JobDetailsHandler.JobDetailsJsonArchivist(); - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - - Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); - Assert.assertEquals(2, archives.size()); - - Iterator<ArchivedJson> iterator = archives.iterator(); - ArchivedJson archive1 = iterator.next(); - Assert.assertEquals("/jobs/" + originalJob.getJobID(), archive1.getPath()); - compareJobDetails(originalJob, archive1.getJson()); - - ArchivedJson archive2 = iterator.next(); - Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices", archive2.getPath()); - compareJobDetails(originalJob, archive2.getJson()); - } - - @Test - public void testGetPaths() { - JobDetailsHandler handler = new JobDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); - String[] paths = handler.getPaths(); - Assert.assertEquals(2, paths.length); - List<String> pathsList = Lists.newArrayList(paths); - Assert.assertTrue(pathsList.contains("/jobs/:jobid")); - Assert.assertTrue(pathsList.contains("/jobs/:jobid/vertices")); - } - - @Test - public void testJsonGeneration() throws Exception { - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - String json = JobDetailsHandler.createJobDetailsJson(originalJob, null); - - compareJobDetails(originalJob, json); - } - - private static void compareJobDetails(AccessExecutionGraph originalJob, String json) throws IOException { - JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); - - Assert.assertEquals(originalJob.getJobID().toString(), result.get("jid").asText()); - Assert.assertEquals(originalJob.getJobName(), result.get("name").asText()); - Assert.assertEquals(originalJob.isStoppable(), result.get("isStoppable").asBoolean()); - Assert.assertEquals(originalJob.getState().name(), result.get("state").asText()); - - Assert.assertEquals(originalJob.getStatusTimestamp(JobStatus.CREATED), result.get("start-time").asLong()); - Assert.assertEquals(originalJob.getStatusTimestamp(originalJob.getState()), result.get("end-time").asLong()); - Assert.assertEquals( - originalJob.getStatusTimestamp(originalJob.getState()) - originalJob.getStatusTimestamp(JobStatus.CREATED), - result.get("duration").asLong() - ); - - JsonNode timestamps = result.get("timestamps"); - for (JobStatus status : JobStatus.values()) { - Assert.assertEquals(originalJob.getStatusTimestamp(status), timestamps.get(status.name()).asLong()); - } - - ArrayNode tasks = (ArrayNode) result.get("vertices"); - int x = 0; - for (AccessExecutionJobVertex expectedTask : originalJob.getVerticesTopologically()) { - JsonNode task = tasks.get(x); - - Assert.assertEquals(expectedTask.getJobVertexId().toString(), task.get("id").asText()); - Assert.assertEquals(expectedTask.getName(), task.get("name").asText()); - Assert.assertEquals(expectedTask.getParallelism(), task.get("parallelism").asInt()); - Assert.assertEquals(expectedTask.getAggregateState().name(), task.get("status").asText()); - - Assert.assertEquals(3, task.get("start-time").asLong()); - Assert.assertEquals(5, task.get("end-time").asLong()); - Assert.assertEquals(2, task.get("duration").asLong()); - - JsonNode subtasksPerState = task.get("tasks"); - Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CREATED.name()).asInt()); - Assert.assertEquals(0, subtasksPerState.get(ExecutionState.SCHEDULED.name()).asInt()); - Assert.assertEquals(0, subtasksPerState.get(ExecutionState.DEPLOYING.name()).asInt()); - Assert.assertEquals(0, subtasksPerState.get(ExecutionState.RUNNING.name()).asInt()); - Assert.assertEquals(1, subtasksPerState.get(ExecutionState.FINISHED.name()).asInt()); - Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CANCELING.name()).asInt()); - Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CANCELED.name()).asInt()); - Assert.assertEquals(0, subtasksPerState.get(ExecutionState.FAILED.name()).asInt()); - - long expectedNumBytesIn = 0; - long expectedNumBytesOut = 0; - long expectedNumRecordsIn = 0; - long expectedNumRecordsOut = 0; - - for (AccessExecutionVertex vertex : expectedTask.getTaskVertices()) { - IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics(); - - expectedNumBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote(); - expectedNumBytesOut += ioMetrics.getNumBytesOut(); - expectedNumRecordsIn += ioMetrics.getNumRecordsIn(); - expectedNumRecordsOut += ioMetrics.getNumRecordsOut(); - } - - JsonNode metrics = task.get("metrics"); - - Assert.assertEquals(expectedNumBytesIn, metrics.get("read-bytes").asLong()); - Assert.assertEquals(expectedNumBytesOut, metrics.get("write-bytes").asLong()); - Assert.assertEquals(expectedNumRecordsIn, metrics.get("read-records").asLong()); - Assert.assertEquals(expectedNumRecordsOut, metrics.get("write-records").asLong()); - - x++; - } - Assert.assertEquals(1, tasks.size()); - - JsonNode statusCounts = result.get("status-counts"); - Assert.assertEquals(0, statusCounts.get(ExecutionState.CREATED.name()).asInt()); - Assert.assertEquals(0, statusCounts.get(ExecutionState.SCHEDULED.name()).asInt()); - Assert.assertEquals(0, statusCounts.get(ExecutionState.DEPLOYING.name()).asInt()); - Assert.assertEquals(1, statusCounts.get(ExecutionState.RUNNING.name()).asInt()); - Assert.assertEquals(0, statusCounts.get(ExecutionState.FINISHED.name()).asInt()); - Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELING.name()).asInt()); - Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELED.name()).asInt()); - Assert.assertEquals(0, statusCounts.get(ExecutionState.FAILED.name()).asInt()); - - Assert.assertEquals(ArchivedJobGenerationUtils.MAPPER.readTree(originalJob.getJsonPlan()), result.get("plan")); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java deleted file mode 100644 index 6e0f918..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; -import org.apache.flink.util.ExceptionUtils; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collection; - -import static org.mockito.Mockito.mock; - -/** - * Tests for the JobExceptionsHandler. - */ -public class JobExceptionsHandlerTest { - - @Test - public void testArchiver() throws Exception { - JsonArchivist archivist = new JobExceptionsHandler.JobExceptionsJsonArchivist(); - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - - Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); - Assert.assertEquals(1, archives.size()); - - ArchivedJson archive = archives.iterator().next(); - Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/exceptions", archive.getPath()); - compareExceptions(originalJob, archive.getJson()); - } - - @Test - public void testGetPaths() { - JobExceptionsHandler handler = new JobExceptionsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]); - } - - @Test - public void testJsonGeneration() throws Exception { - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - String json = JobExceptionsHandler.createJobExceptionsJson(originalJob); - - compareExceptions(originalJob, json); - } - - private static void compareExceptions(AccessExecutionGraph originalJob, String json) throws IOException { - JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); - - Assert.assertEquals(originalJob.getFailureCause().getExceptionAsString(), result.get("root-exception").asText()); - Assert.assertEquals(originalJob.getFailureCause().getTimestamp(), result.get("timestamp").asLong()); - - ArrayNode exceptions = (ArrayNode) result.get("all-exceptions"); - - int x = 0; - for (AccessExecutionVertex expectedSubtask : originalJob.getAllExecutionVertices()) { - if (!expectedSubtask.getFailureCauseAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { - JsonNode exception = exceptions.get(x); - - Assert.assertEquals(expectedSubtask.getFailureCauseAsString(), exception.get("exception").asText()); - Assert.assertEquals(expectedSubtask.getStateTimestamp(ExecutionState.FAILED), exception.get("timestamp").asLong()); - Assert.assertEquals(expectedSubtask.getTaskNameWithSubtaskIndex(), exception.get("task").asText()); - - TaskManagerLocation location = expectedSubtask.getCurrentAssignedResourceLocation(); - String expectedLocationString = location.getFQDNHostname() + ':' + location.dataPort(); - Assert.assertEquals(expectedLocationString, exception.get("location").asText()); - } - x++; - } - Assert.assertEquals(x > JobExceptionsHandler.MAX_NUMBER_EXCEPTION_TO_REPORT, result.get("truncated").asBoolean()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java deleted file mode 100644 index 94fd5a8..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.Executors; - -import org.junit.Assert; -import org.junit.Test; - -/** - * Tests for the JobManagerConfigHandler. - */ -public class JobManagerConfigHandlerTest { - @Test - public void testGetPaths() { - JobManagerConfigHandler handler = new JobManagerConfigHandler(Executors.directExecutor(), null); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobmanager/config", paths[0]); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java deleted file mode 100644 index 4a934ec..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.Collection; - -import static org.mockito.Mockito.mock; - -/** - * Tests for the JobPlanHandler. - */ -public class JobPlanHandlerTest { - - @Test - public void testArchiver() throws Exception { - JsonArchivist archivist = new JobPlanHandler.JobPlanJsonArchivist(); - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - - Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); - Assert.assertEquals(1, archives.size()); - - ArchivedJson archive = archives.iterator().next(); - Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/plan", archive.getPath()); - Assert.assertEquals(originalJob.getJsonPlan(), archive.getJson()); - } - - @Test - public void testGetPaths() { - JobPlanHandler handler = new JobPlanHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/plan", paths[0]); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java deleted file mode 100644 index 8c05c83..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.util.TestLogger; - -import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.List; - -/** - * Tests for the JobStoppingHandler. - */ -public class JobStoppingHandlerTest extends TestLogger { - @Test - public void testGetPaths() { - JobStoppingHandler handler = new JobStoppingHandler(Executors.directExecutor(), TestingUtils.TIMEOUT()); - String[] paths = handler.getPaths(); - Assert.assertEquals(2, paths.length); - List<String> pathsList = Lists.newArrayList(paths); - Assert.assertTrue(pathsList.contains("/jobs/:jobid/stop")); - Assert.assertTrue(pathsList.contains("/jobs/:jobid/yarn-stop")); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java deleted file mode 100644 index 5af9aa6..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collection; - -import static org.mockito.Mockito.mock; - -/** - * Tests for the JobVertexAccumulatorsHandler. - */ -public class JobVertexAccumulatorsHandlerTest { - - @Test - public void testArchiver() throws Exception { - JsonArchivist archivist = new JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist(); - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); - - Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); - Assert.assertEquals(1, archives.size()); - - ArchivedJson archive = archives.iterator().next(); - Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/accumulators", archive.getPath()); - compareAccumulators(originalTask, archive.getJson()); - } - - @Test - public void testGetPaths() { - JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]); - } - - @Test - public void testJsonGeneration() throws Exception { - AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); - String json = JobVertexAccumulatorsHandler.createVertexAccumulatorsJson(originalTask); - - compareAccumulators(originalTask, json); - } - - private static void compareAccumulators(AccessExecutionJobVertex originalTask, String json) throws IOException { - JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); - - Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); - - ArrayNode accs = (ArrayNode) result.get("user-accumulators"); - StringifiedAccumulatorResult[] expectedAccs = originalTask.getAggregatedUserAccumulatorsStringified(); - - ArchivedJobGenerationUtils.compareStringifiedAccumulators(expectedAccs, accs); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java deleted file mode 100644 index 0d15e08..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Collections; - -import scala.Option; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -/** - * Tests for back pressure handler responses. - */ -public class JobVertexBackPressureHandlerTest { - @Test - public void testGetPaths() { - JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), mock(BackPressureStatsTracker.class), 0); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/backpressure", paths[0]); - } - - /** Tests the response when no stats are available. */ - @Test - public void testResponseNoStatsAvailable() throws Exception { - ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class); - BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class); - - when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class))) - .thenReturn(Option.<OperatorBackPressureStats>empty()); - - JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler( - mock(ExecutionGraphHolder.class), - Executors.directExecutor(), - statsTracker, - 9999); - - String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get(); - - ObjectMapper mapper = new ObjectMapper(); - JsonNode rootNode = mapper.readTree(response); - - // Single element - assertEquals(1, rootNode.size()); - - // Status - JsonNode status = rootNode.get("status"); - assertNotNull(status); - assertEquals("deprecated", status.textValue()); - - verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class)); - } - - /** Tests the response when stats are available. */ - @Test - public void testResponseStatsAvailable() throws Exception { - ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class); - BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class); - - OperatorBackPressureStats stats = new OperatorBackPressureStats( - 0, System.currentTimeMillis(), new double[] { 0.31, 0.48, 1.0, 0.0 }); - - when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class))) - .thenReturn(Option.apply(stats)); - - JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler( - mock(ExecutionGraphHolder.class), - Executors.directExecutor(), - statsTracker, - 9999); - - String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get(); - - ObjectMapper mapper = new ObjectMapper(); - JsonNode rootNode = mapper.readTree(response); - - // Single element - assertEquals(4, rootNode.size()); - - // Status - JsonNode status = rootNode.get("status"); - assertNotNull(status); - assertEquals("ok", status.textValue()); - - // Back pressure level - JsonNode backPressureLevel = rootNode.get("backpressure-level"); - assertNotNull(backPressureLevel); - assertEquals("high", backPressureLevel.textValue()); - - // End time stamp - JsonNode endTimeStamp = rootNode.get("end-timestamp"); - assertNotNull(endTimeStamp); - assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue()); - - // Subtasks - JsonNode subTasks = rootNode.get("subtasks"); - assertEquals(stats.getNumberOfSubTasks(), subTasks.size()); - for (int i = 0; i < subTasks.size(); i++) { - JsonNode subTask = subTasks.get(i); - - JsonNode index = subTask.get("subtask"); - assertEquals(i, index.intValue()); - - JsonNode level = subTask.get("backpressure-level"); - assertEquals(JobVertexBackPressureHandler - .getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue()); - - JsonNode ratio = subTask.get("ratio"); - assertEquals(stats.getBackPressureRatio(i), ratio.doubleValue(), 0.0); - } - - // Verify not triggered - verify(statsTracker, never()).triggerStackTraceSample(any(ExecutionJobVertex.class)); - } - - /** Tests that after the refresh interval another sample is triggered. */ - @Test - public void testResponsePassedRefreshInterval() throws Exception { - ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class); - BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class); - - OperatorBackPressureStats stats = new OperatorBackPressureStats( - 0, System.currentTimeMillis(), new double[] { 0.31, 0.48, 1.0, 0.0 }); - - when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class))) - .thenReturn(Option.apply(stats)); - - JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler( - mock(ExecutionGraphHolder.class), - Executors.directExecutor(), - statsTracker, - 0); // <----- refresh interval should fire immediately - - String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get(); - - ObjectMapper mapper = new ObjectMapper(); - JsonNode rootNode = mapper.readTree(response); - - // Single element - assertEquals(4, rootNode.size()); - - // Status - JsonNode status = rootNode.get("status"); - assertNotNull(status); - // Interval passed, hence deprecated - assertEquals("deprecated", status.textValue()); - - // Back pressure level - JsonNode backPressureLevel = rootNode.get("backpressure-level"); - assertNotNull(backPressureLevel); - assertEquals("high", backPressureLevel.textValue()); - - // End time stamp - JsonNode endTimeStamp = rootNode.get("end-timestamp"); - assertNotNull(endTimeStamp); - assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue()); - - // Subtasks - JsonNode subTasks = rootNode.get("subtasks"); - assertEquals(stats.getNumberOfSubTasks(), subTasks.size()); - for (int i = 0; i < subTasks.size(); i++) { - JsonNode subTask = subTasks.get(i); - - JsonNode index = subTask.get("subtask"); - assertEquals(i, index.intValue()); - - JsonNode level = subTask.get("backpressure-level"); - assertEquals(JobVertexBackPressureHandler - .getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue()); - - JsonNode ratio = subTask.get("ratio"); - assertEquals(stats.getBackPressureRatio(i), ratio.doubleValue(), 0.0); - } - - // Verify triggered - verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class)); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java deleted file mode 100644 index 1b8d9aa..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collection; - -import static org.mockito.Mockito.mock; - -/** - * Tests for the JobVertexDetailsHandler. - */ -public class JobVertexDetailsHandlerTest { - - @Test - public void testArchiver() throws Exception { - JsonArchivist archivist = new JobVertexDetailsHandler.JobVertexDetailsJsonArchivist(); - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); - - Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); - Assert.assertEquals(1, archives.size()); - - ArchivedJson archive = archives.iterator().next(); - Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId(), archive.getPath()); - compareVertexDetails(originalTask, archive.getJson()); - } - - @Test - public void testGetPaths() { - JobVertexDetailsHandler handler = new JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", paths[0]); - } - - @Test - public void testJsonGeneration() throws Exception { - AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); - String json = JobVertexDetailsHandler.createVertexDetailsJson( - originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null); - - compareVertexDetails(originalTask, json); - } - - private static void compareVertexDetails(AccessExecutionJobVertex originalTask, String json) throws IOException { - JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); - - Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); - Assert.assertEquals(originalTask.getName(), result.get("name").asText()); - Assert.assertEquals(originalTask.getParallelism(), result.get("parallelism").asInt()); - Assert.assertTrue(result.get("now").asLong() > 0); - - ArrayNode subtasks = (ArrayNode) result.get("subtasks"); - - Assert.assertEquals(originalTask.getTaskVertices().length, subtasks.size()); - for (int x = 0; x < originalTask.getTaskVertices().length; x++) { - AccessExecutionVertex expectedSubtask = originalTask.getTaskVertices()[x]; - JsonNode subtask = subtasks.get(x); - - Assert.assertEquals(x, subtask.get("subtask").asInt()); - Assert.assertEquals(expectedSubtask.getExecutionState().name(), subtask.get("status").asText()); - Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(), subtask.get("attempt").asInt()); - - TaskManagerLocation location = expectedSubtask.getCurrentAssignedResourceLocation(); - String expectedLocationString = location.getHostname() + ":" + location.dataPort(); - Assert.assertEquals(expectedLocationString, subtask.get("host").asText()); - long start = expectedSubtask.getStateTimestamp(ExecutionState.DEPLOYING); - Assert.assertEquals(start, subtask.get("start-time").asLong()); - long end = expectedSubtask.getStateTimestamp(ExecutionState.FINISHED); - Assert.assertEquals(end, subtask.get("end-time").asLong()); - Assert.assertEquals(end - start, subtask.get("duration").asLong()); - - ArchivedJobGenerationUtils.compareIoMetrics(expectedSubtask.getCurrentExecutionAttempt().getIOMetrics(), subtask.get("metrics")); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java deleted file mode 100644 index badb952..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.executiongraph.IOMetrics; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collection; - -import static org.mockito.Mockito.mock; - -/** - * Tests for the JobVertexTaskManagersHandler. - */ -public class JobVertexTaskManagersHandlerTest { - - @Test - public void testArchiver() throws Exception { - JsonArchivist archivist = new JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist(); - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); - AccessExecutionVertex originalSubtask = ArchivedJobGenerationUtils.getTestSubtask(); - - Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); - Assert.assertEquals(1, archives.size()); - - ArchivedJson archive = archives.iterator().next(); - Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/taskmanagers", archive.getPath()); - compareVertexTaskManagers(originalTask, originalSubtask, archive.getJson()); - } - - @Test - public void testGetPaths() { - JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]); - } - - @Test - public void testJsonGeneration() throws Exception { - AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); - AccessExecutionVertex originalSubtask = ArchivedJobGenerationUtils.getTestSubtask(); - String json = JobVertexTaskManagersHandler.createVertexDetailsByTaskManagerJson( - originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null); - - compareVertexTaskManagers(originalTask, originalSubtask, json); - } - - private static void compareVertexTaskManagers(AccessExecutionJobVertex originalTask, AccessExecutionVertex originalSubtask, String json) throws IOException { - JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); - - Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); - Assert.assertEquals(originalTask.getName(), result.get("name").asText()); - Assert.assertTrue(result.get("now").asLong() > 0); - - ArrayNode taskmanagers = (ArrayNode) result.get("taskmanagers"); - - JsonNode taskManager = taskmanagers.get(0); - - TaskManagerLocation location = originalSubtask.getCurrentAssignedResourceLocation(); - String expectedLocationString = location.getHostname() + ':' + location.dataPort(); - Assert.assertEquals(expectedLocationString, taskManager.get("host").asText()); - Assert.assertEquals(ExecutionState.FINISHED.name(), taskManager.get("status").asText()); - - Assert.assertEquals(3, taskManager.get("start-time").asLong()); - Assert.assertEquals(5, taskManager.get("end-time").asLong()); - Assert.assertEquals(2, taskManager.get("duration").asLong()); - - JsonNode statusCounts = taskManager.get("status-counts"); - Assert.assertEquals(0, statusCounts.get(ExecutionState.CREATED.name()).asInt()); - Assert.assertEquals(0, statusCounts.get(ExecutionState.SCHEDULED.name()).asInt()); - Assert.assertEquals(0, statusCounts.get(ExecutionState.DEPLOYING.name()).asInt()); - Assert.assertEquals(0, statusCounts.get(ExecutionState.RUNNING.name()).asInt()); - Assert.assertEquals(1, statusCounts.get(ExecutionState.FINISHED.name()).asInt()); - Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELING.name()).asInt()); - Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELED.name()).asInt()); - Assert.assertEquals(0, statusCounts.get(ExecutionState.FAILED.name()).asInt()); - - long expectedNumBytesIn = 0; - long expectedNumBytesOut = 0; - long expectedNumRecordsIn = 0; - long expectedNumRecordsOut = 0; - - for (AccessExecutionVertex vertex : originalTask.getTaskVertices()) { - IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics(); - - expectedNumBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote(); - expectedNumBytesOut += ioMetrics.getNumBytesOut(); - expectedNumRecordsIn += ioMetrics.getNumRecordsIn(); - expectedNumRecordsOut += ioMetrics.getNumRecordsOut(); - } - - JsonNode metrics = taskManager.get("metrics"); - - Assert.assertEquals(expectedNumBytesIn, metrics.get("read-bytes").asLong()); - Assert.assertEquals(expectedNumBytesOut, metrics.get("write-bytes").asLong()); - Assert.assertEquals(expectedNumRecordsIn, metrics.get("read-records").asLong()); - Assert.assertEquals(expectedNumRecordsOut, metrics.get("write-records").asLong()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java deleted file mode 100644 index a80bac9..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; - -import org.junit.Assert; -import org.junit.Test; - -import static org.mockito.Mockito.mock; - -/** - * Tests for the SubtaskCurrentAttemptDetailsHandler. - */ -public class SubtaskCurrentAttemptDetailsHandlerTest { - @Test - public void testGetPaths() { - SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", paths[0]); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java deleted file mode 100644 index 6773fd4..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.executiongraph.AccessExecution; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collection; - -import static org.mockito.Mockito.mock; - -/** - * Tests for the SubtaskExecutionAttemptAccumulatorsHandler. - */ -public class SubtaskExecutionAttemptAccumulatorsHandlerTest { - - @Test - public void testArchiver() throws Exception { - JsonArchivist archivist = new SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist(); - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); - AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); - - Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); - Assert.assertEquals(1, archives.size()); - - ArchivedJson archive = archives.iterator().next(); - Assert.assertEquals( - "/jobs/" + originalJob.getJobID() + - "/vertices/" + originalTask.getJobVertexId() + - "/subtasks/" + originalAttempt.getParallelSubtaskIndex() + - "/attempts/" + originalAttempt.getAttemptNumber() + - "/accumulators", - archive.getPath()); - compareAttemptAccumulators(originalAttempt, archive.getJson()); - } - - @Test - public void testGetPaths() { - SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", paths[0]); - } - - @Test - public void testJsonGeneration() throws Exception { - AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); - String json = SubtaskExecutionAttemptAccumulatorsHandler.createAttemptAccumulatorsJson(originalAttempt); - - compareAttemptAccumulators(originalAttempt, json); - } - - private static void compareAttemptAccumulators(AccessExecution originalAttempt, String json) throws IOException { - JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); - - Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt()); - Assert.assertEquals(originalAttempt.getAttemptNumber(), result.get("attempt").asInt()); - Assert.assertEquals(originalAttempt.getAttemptId().toString(), result.get("id").asText()); - - ArchivedJobGenerationUtils.compareStringifiedAccumulators(originalAttempt.getUserAccumulatorsStringified(), (ArrayNode) result.get("user-accumulators")); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java deleted file mode 100644 index 7777d2d..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecution; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; - -import com.fasterxml.jackson.databind.JsonNode; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collection; -import java.util.Iterator; - -import static org.mockito.Mockito.mock; - -/** - * Tests for the SubtaskExecutionAttemptDetailsHandler. - */ -public class SubtaskExecutionAttemptDetailsHandlerTest { - - @Test - public void testArchiver() throws Exception { - JsonArchivist archivist = new SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist(); - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); - AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); - - Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); - Assert.assertEquals(2, archives.size()); - - Iterator<ArchivedJson> iterator = archives.iterator(); - ArchivedJson archive1 = iterator.next(); - Assert.assertEquals( - "/jobs/" + originalJob.getJobID() + - "/vertices/" + originalTask.getJobVertexId() + - "/subtasks/" + originalAttempt.getParallelSubtaskIndex(), - archive1.getPath()); - compareAttemptDetails(originalAttempt, archive1.getJson()); - - ArchivedJson archive2 = iterator.next(); - Assert.assertEquals( - "/jobs/" + originalJob.getJobID() + - "/vertices/" + originalTask.getJobVertexId() + - "/subtasks/" + originalAttempt.getParallelSubtaskIndex() + - "/attempts/" + originalAttempt.getAttemptNumber(), - archive2.getPath()); - compareAttemptDetails(originalAttempt, archive2.getJson()); - } - - @Test - public void testGetPaths() { - SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", paths[0]); - } - - @Test - public void testJsonGeneration() throws Exception { - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); - AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); - String json = SubtaskExecutionAttemptDetailsHandler.createAttemptDetailsJson( - originalAttempt, originalJob.getJobID().toString(), originalTask.getJobVertexId().toString(), null); - - compareAttemptDetails(originalAttempt, json); - } - - private static void compareAttemptDetails(AccessExecution originalAttempt, String json) throws IOException { - JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); - - Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt()); - Assert.assertEquals(originalAttempt.getState().name(), result.get("status").asText()); - Assert.assertEquals(originalAttempt.getAttemptNumber(), result.get("attempt").asInt()); - Assert.assertEquals(originalAttempt.getAssignedResourceLocation().getHostname(), result.get("host").asText()); - long start = originalAttempt.getStateTimestamp(ExecutionState.DEPLOYING); - Assert.assertEquals(start, result.get("start-time").asLong()); - long end = originalAttempt.getStateTimestamp(ExecutionState.FINISHED); - Assert.assertEquals(end, result.get("end-time").asLong()); - Assert.assertEquals(end - start, result.get("duration").asLong()); - - ArchivedJobGenerationUtils.compareIoMetrics(originalAttempt.getIOMetrics(), result.get("metrics")); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java deleted file mode 100644 index 7b400da..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collection; - -import static org.mockito.Mockito.mock; - -/** - * Tests for the SubtasksAllAccumulatorsHandler. - */ -public class SubtasksAllAccumulatorsHandlerTest { - - @Test - public void testArchiver() throws Exception { - JsonArchivist archivist = new SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist(); - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); - - Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); - Assert.assertEquals(1, archives.size()); - - ArchivedJson archive = archives.iterator().next(); - Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + - "/subtasks/accumulators", archive.getPath()); - compareSubtaskAccumulators(originalTask, archive.getJson()); - } - - @Test - public void testGetPaths() { - SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", paths[0]); - } - - @Test - public void testJsonGeneration() throws Exception { - AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); - String json = SubtasksAllAccumulatorsHandler.createSubtasksAccumulatorsJson(originalTask); - compareSubtaskAccumulators(originalTask, json); - } - - private static void compareSubtaskAccumulators(AccessExecutionJobVertex originalTask, String json) throws IOException { - JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); - - Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); - Assert.assertEquals(originalTask.getParallelism(), result.get("parallelism").asInt()); - - ArrayNode subtasks = (ArrayNode) result.get("subtasks"); - - Assert.assertEquals(originalTask.getTaskVertices().length, subtasks.size()); - for (int x = 0; x < originalTask.getTaskVertices().length; x++) { - JsonNode subtask = subtasks.get(x); - AccessExecutionVertex expectedSubtask = originalTask.getTaskVertices()[x]; - - Assert.assertEquals(x, subtask.get("subtask").asInt()); - Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(), subtask.get("attempt").asInt()); - Assert.assertEquals(expectedSubtask.getCurrentAssignedResourceLocation().getHostname(), subtask.get("host").asText()); - - ArchivedJobGenerationUtils.compareStringifiedAccumulators( - expectedSubtask.getCurrentExecutionAttempt().getUserAccumulatorsStringified(), - (ArrayNode) subtask.get("user-accumulators")); - } - } -}
