http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java deleted file mode 100644 index 31c2212..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecution; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collection; - -import static org.mockito.Mockito.mock; - -/** - * Tests for the SubtasksTimesHandler. - */ -public class SubtasksTimesHandlerTest { - - @Test - public void testArchiver() throws Exception { - JsonArchivist archivist = new SubtasksTimesHandler.SubtasksTimesJsonArchivist(); - AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); - AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); - AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); - - Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); - Assert.assertEquals(1, archives.size()); - - ArchivedJson archive = archives.iterator().next(); - Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/subtasktimes", archive.getPath()); - compareSubtaskTimes(originalTask, originalAttempt, archive.getJson()); - } - - @Test - public void testGetPaths() { - SubtasksTimesHandler handler = new SubtasksTimesHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasktimes", paths[0]); - } - - @Test - public void testJsonGeneration() throws Exception { - AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); - AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); - String json = SubtasksTimesHandler.createSubtaskTimesJson(originalTask); - - compareSubtaskTimes(originalTask, originalAttempt, json); - } - - private static void compareSubtaskTimes(AccessExecutionJobVertex originalTask, AccessExecution originalAttempt, String json) throws IOException { - JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); - - Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); - Assert.assertEquals(originalTask.getName(), result.get("name").asText()); - Assert.assertTrue(result.get("now").asLong() > 0L); - - ArrayNode subtasks = (ArrayNode) result.get("subtasks"); - - JsonNode subtask = subtasks.get(0); - Assert.assertEquals(0, subtask.get("subtask").asInt()); - Assert.assertEquals(originalAttempt.getAssignedResourceLocation().getHostname(), subtask.get("host").asText()); - Assert.assertEquals(originalAttempt.getStateTimestamp(originalAttempt.getState()) - originalAttempt.getStateTimestamp(ExecutionState.SCHEDULED), subtask.get("duration").asLong()); - - JsonNode timestamps = subtask.get("timestamps"); - - Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CREATED), timestamps.get(ExecutionState.CREATED.name()).asLong()); - Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.SCHEDULED), timestamps.get(ExecutionState.SCHEDULED.name()).asLong()); - Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.DEPLOYING), timestamps.get(ExecutionState.DEPLOYING.name()).asLong()); - Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.RUNNING), timestamps.get(ExecutionState.RUNNING.name()).asLong()); - Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.FINISHED), timestamps.get(ExecutionState.FINISHED.name()).asLong()); - Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CANCELING), timestamps.get(ExecutionState.CANCELING.name()).asLong()); - Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CANCELED), timestamps.get(ExecutionState.CANCELED.name()).asLong()); - Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.FAILED), timestamps.get(ExecutionState.FAILED.name()).asLong()); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java deleted file mode 100644 index faeff13..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.blob.BlobKey; -import org.apache.flink.runtime.blob.VoidBlobStore; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed; - -import org.junit.Assert; -import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReference; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.isA; -import static org.powermock.api.mockito.PowerMockito.mock; -import static org.powermock.api.mockito.PowerMockito.when; - -/** - * Tests for the TaskManagersLogHandler. - */ -public class TaskManagerLogHandlerTest { - @Test - public void testGetPaths() { - TaskManagerLogHandler handlerLog = new TaskManagerLogHandler( - mock(GatewayRetriever.class), - Executors.directExecutor(), - CompletableFuture.completedFuture("/jm/address"), - TestingUtils.TIMEOUT(), - TaskManagerLogHandler.FileMode.LOG, - new Configuration(), - new VoidBlobStore()); - String[] pathsLog = handlerLog.getPaths(); - Assert.assertEquals(1, pathsLog.length); - Assert.assertEquals("/taskmanagers/:taskmanagerid/log", pathsLog[0]); - - TaskManagerLogHandler handlerOut = new TaskManagerLogHandler( - mock(GatewayRetriever.class), - Executors.directExecutor(), - CompletableFuture.completedFuture("/jm/address"), - TestingUtils.TIMEOUT(), - TaskManagerLogHandler.FileMode.STDOUT, - new Configuration(), - new VoidBlobStore()); - String[] pathsOut = handlerOut.getPaths(); - Assert.assertEquals(1, pathsOut.length); - Assert.assertEquals("/taskmanagers/:taskmanagerid/stdout", pathsOut[0]); - } - - @Test - public void testLogFetchingFailure() throws Exception { - // ========= setup TaskManager ================================================================================= - InstanceID tmID = new InstanceID(); - ResourceID tmRID = new ResourceID(tmID.toString()); - TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class); - when(taskManagerGateway.getAddress()).thenReturn("/tm/address"); - - Instance taskManager = mock(Instance.class); - when(taskManager.getId()).thenReturn(tmID); - when(taskManager.getTaskManagerID()).thenReturn(tmRID); - when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway); - CompletableFuture<BlobKey> future = new CompletableFuture<>(); - future.completeExceptionally(new IOException("failure")); - when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future); - - // ========= setup JobManager ================================================================================== - - JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); - when(jobManagerGateway.requestBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(1337)); - when(jobManagerGateway.getHostname()).thenReturn("localhost"); - when(jobManagerGateway.requestTaskManagerInstance(any(InstanceID.class), any(Time.class))).thenReturn( - CompletableFuture.completedFuture(Optional.of(taskManager))); - - GatewayRetriever<JobManagerGateway> retriever = mock(GatewayRetriever.class); - when(retriever.getNow()) - .thenReturn(Optional.of(jobManagerGateway)); - - TaskManagerLogHandler handler = new TaskManagerLogHandler( - retriever, - Executors.directExecutor(), - CompletableFuture.completedFuture("/jm/address"), - TestingUtils.TIMEOUT(), - TaskManagerLogHandler.FileMode.LOG, - new Configuration(), - new VoidBlobStore()); - - final AtomicReference<String> exception = new AtomicReference<>(); - - ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); - when(ctx.write(isA(ByteBuf.class))).thenAnswer(new Answer<Object>() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - ByteBuf data = invocationOnMock.getArgumentAt(0, ByteBuf.class); - exception.set(new String(data.array(), ConfigConstants.DEFAULT_CHARSET)); - return null; - } - }); - - Map<String, String> pathParams = new HashMap<>(); - pathParams.put(TaskManagersHandler.TASK_MANAGER_ID_KEY, tmID.toString()); - Routed routed = mock(Routed.class); - when(routed.pathParams()).thenReturn(pathParams); - when(routed.request()).thenReturn(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/taskmanagers/" + tmID + "/log")); - - handler.respondAsLeader(ctx, routed, jobManagerGateway); - - Assert.assertEquals("Fetching TaskManager log failed.", exception.get()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java deleted file mode 100644 index e3a71a1..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.Executors; - -import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.List; - -/** - * Tests for the TaskManagersHandler. - */ -public class TaskManagersHandlerTest { - @Test - public void testGetPaths() { - TaskManagersHandler handler = new TaskManagersHandler(Executors.directExecutor(), Time.seconds(0L), null); - String[] paths = handler.getPaths(); - Assert.assertEquals(2, paths.length); - List<String> pathsList = Lists.newArrayList(paths); - Assert.assertTrue(pathsList.contains("/taskmanagers")); - Assert.assertTrue(pathsList.contains("/taskmanagers/:taskmanagerid")); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java deleted file mode 100644 index 47298be..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers.checkpoints; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; -import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * Tests for the CheckpointConfigHandler. - */ -public class CheckpointConfigHandlerTest { - - @Test - public void testArchiver() throws IOException { - JsonArchivist archivist = new CheckpointConfigHandler.CheckpointConfigJsonArchivist(); - GraphAndSettings graphAndSettings = createGraphAndSettings(true, true); - - AccessExecutionGraph graph = graphAndSettings.graph; - when(graph.getJobID()).thenReturn(new JobID()); - JobCheckpointingSettings settings = graphAndSettings.snapshottingSettings; - ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings; - - Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph); - Assert.assertEquals(1, archives.size()); - ArchivedJson archive = archives.iterator().next(); - Assert.assertEquals("/jobs/" + graph.getJobID() + "/checkpoints/config", archive.getPath()); - - ObjectMapper mapper = new ObjectMapper(); - JsonNode rootNode = mapper.readTree(archive.getJson()); - - Assert.assertEquals("exactly_once", rootNode.get("mode").asText()); - Assert.assertEquals(settings.getCheckpointInterval(), rootNode.get("interval").asLong()); - Assert.assertEquals(settings.getCheckpointTimeout(), rootNode.get("timeout").asLong()); - Assert.assertEquals(settings.getMinPauseBetweenCheckpoints(), rootNode.get("min_pause").asLong()); - Assert.assertEquals(settings.getMaxConcurrentCheckpoints(), rootNode.get("max_concurrent").asInt()); - - JsonNode externalizedNode = rootNode.get("externalization"); - Assert.assertNotNull(externalizedNode); - Assert.assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean()); - Assert.assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean()); - - } - - @Test - public void testGetPaths() { - CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/checkpoints/config", paths[0]); - } - - /** - * Tests a simple config. - */ - @Test - public void testSimpleConfig() throws Exception { - GraphAndSettings graphAndSettings = createGraphAndSettings(false, true); - - AccessExecutionGraph graph = graphAndSettings.graph; - JobCheckpointingSettings settings = graphAndSettings.snapshottingSettings; - - CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); - String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get(); - - ObjectMapper mapper = new ObjectMapper(); - JsonNode rootNode = mapper.readTree(json); - - assertEquals("exactly_once", rootNode.get("mode").asText()); - assertEquals(settings.getCheckpointInterval(), rootNode.get("interval").asLong()); - assertEquals(settings.getCheckpointTimeout(), rootNode.get("timeout").asLong()); - assertEquals(settings.getMinPauseBetweenCheckpoints(), rootNode.get("min_pause").asLong()); - assertEquals(settings.getMaxConcurrentCheckpoints(), rootNode.get("max_concurrent").asInt()); - - JsonNode externalizedNode = rootNode.get("externalization"); - assertNotNull(externalizedNode); - assertEquals(false, externalizedNode.get("enabled").asBoolean()); - } - - /** - * Tests the that the isExactlyOnce flag is respected. - */ - @Test - public void testAtLeastOnce() throws Exception { - GraphAndSettings graphAndSettings = createGraphAndSettings(false, false); - - AccessExecutionGraph graph = graphAndSettings.graph; - - CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); - String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get(); - - ObjectMapper mapper = new ObjectMapper(); - JsonNode rootNode = mapper.readTree(json); - - assertEquals("at_least_once", rootNode.get("mode").asText()); - } - - /** - * Tests that the externalized checkpoint settings are forwarded. - */ - @Test - public void testEnabledExternalizedCheckpointSettings() throws Exception { - GraphAndSettings graphAndSettings = createGraphAndSettings(true, false); - - AccessExecutionGraph graph = graphAndSettings.graph; - ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings; - - CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); - String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get(); - - ObjectMapper mapper = new ObjectMapper(); - JsonNode externalizedNode = mapper.readTree(json).get("externalization"); - assertNotNull(externalizedNode); - assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean()); - assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean()); - } - - private static GraphAndSettings createGraphAndSettings(boolean externalized, boolean exactlyOnce) { - long interval = 18231823L; - long timeout = 996979L; - long minPause = 119191919L; - int maxConcurrent = 12929329; - ExternalizedCheckpointSettings externalizedSetting = externalized - ? ExternalizedCheckpointSettings.externalizeCheckpoints(true) - : ExternalizedCheckpointSettings.none(); - - JobCheckpointingSettings settings = new JobCheckpointingSettings( - Collections.<JobVertexID>emptyList(), - Collections.<JobVertexID>emptyList(), - Collections.<JobVertexID>emptyList(), - interval, - timeout, - minPause, - maxConcurrent, - externalizedSetting, - null, - exactlyOnce); - - AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - when(graph.getJobCheckpointingSettings()).thenReturn(settings); - - return new GraphAndSettings(graph, settings, externalizedSetting); - } - - private static class GraphAndSettings { - public final AccessExecutionGraph graph; - public final JobCheckpointingSettings snapshottingSettings; - public final ExternalizedCheckpointSettings externalizedSettings; - - public GraphAndSettings( - AccessExecutionGraph graph, - JobCheckpointingSettings snapshottingSettings, - ExternalizedCheckpointSettings externalizedSettings) { - this.graph = graph; - this.snapshottingSettings = snapshottingSettings; - this.externalizedSettings = externalizedSettings; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCacheTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCacheTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCacheTest.java deleted file mode 100644 index bdb3faf..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCacheTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers.checkpoints; - -import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; -import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * Tests for the CheckpoitnStatsCache. - */ -public class CheckpointStatsCacheTest { - - @Test - public void testZeroSizeCache() throws Exception { - AbstractCheckpointStats checkpoint = createCheckpoint(0, CheckpointStatsStatus.COMPLETED); - - CheckpointStatsCache cache = new CheckpointStatsCache(0); - cache.tryAdd(checkpoint); - assertNull(cache.tryGet(0L)); - } - - @Test - public void testCacheAddAndGet() throws Exception { - AbstractCheckpointStats chk0 = createCheckpoint(0, CheckpointStatsStatus.COMPLETED); - AbstractCheckpointStats chk1 = createCheckpoint(1, CheckpointStatsStatus.COMPLETED); - AbstractCheckpointStats chk2 = createCheckpoint(2, CheckpointStatsStatus.IN_PROGRESS); - - CheckpointStatsCache cache = new CheckpointStatsCache(1); - cache.tryAdd(chk0); - assertEquals(chk0, cache.tryGet(0)); - - cache.tryAdd(chk1); - assertNull(cache.tryGet(0)); - assertEquals(chk1, cache.tryGet(1)); - - cache.tryAdd(chk2); - assertNull(cache.tryGet(2)); - assertNull(cache.tryGet(0)); - assertEquals(chk1, cache.tryGet(1)); - } - - private AbstractCheckpointStats createCheckpoint(long id, CheckpointStatsStatus status) { - AbstractCheckpointStats checkpoint = mock(AbstractCheckpointStats.class); - when(checkpoint.getCheckpointId()).thenReturn(id); - when(checkpoint.getStatus()).thenReturn(status); - return checkpoint; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java deleted file mode 100644 index f16d623..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java +++ /dev/null @@ -1,358 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers.checkpoints; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; -import org.apache.flink.runtime.checkpoint.CheckpointProperties; -import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; -import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; -import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; -import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; -import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; -import org.apache.flink.runtime.checkpoint.PendingCheckpointStats; -import org.apache.flink.runtime.checkpoint.TaskStateStats; -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -/** - * Tests for the CheckpointStatsDetailsHandler. - */ -public class CheckpointStatsDetailsHandlerTest { - - @Test - public void testArchiver() throws IOException { - JsonArchivist archivist = new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist(); - - CompletedCheckpointStats completedCheckpoint = createCompletedCheckpoint(); - FailedCheckpointStats failedCheckpoint = createFailedCheckpoint(); - List<AbstractCheckpointStats> checkpoints = new ArrayList<>(); - checkpoints.add(failedCheckpoint); - checkpoints.add(completedCheckpoint); - - CheckpointStatsHistory history = mock(CheckpointStatsHistory.class); - when(history.getCheckpoints()).thenReturn(checkpoints); - CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class); - when(snapshot.getHistory()).thenReturn(history); - - AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - when(graph.getJobID()).thenReturn(new JobID()); - - ObjectMapper mapper = new ObjectMapper(); - - Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph); - Assert.assertEquals(2, archives.size()); - - Iterator<ArchivedJson> iterator = archives.iterator(); - ArchivedJson archive1 = iterator.next(); - Assert.assertEquals( - "/jobs/" + graph.getJobID() + "/checkpoints/details/" + failedCheckpoint.getCheckpointId(), - archive1.getPath()); - compareFailedCheckpoint(failedCheckpoint, mapper.readTree(archive1.getJson())); - - ArchivedJson archive2 = iterator.next(); - Assert.assertEquals( - "/jobs/" + graph.getJobID() + "/checkpoints/details/" + completedCheckpoint.getCheckpointId(), - archive2.getPath()); - compareCompletedCheckpoint(completedCheckpoint, mapper.readTree(archive2.getJson())); - } - - @Test - public void testGetPaths() { - CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid", paths[0]); - } - - /** - * Tests request with illegal checkpoint ID param. - */ - @Test - public void testIllegalCheckpointId() throws Exception { - AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); - Map<String, String> params = new HashMap<>(); - params.put("checkpointid", "illegal checkpoint"); - String json = handler.handleRequest(graph, params).get(); - - assertEquals("{}", json); - } - - /** - * Tests request with missing checkpoint ID param. - */ - @Test - public void testNoCheckpointIdParam() throws Exception { - AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); - String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get(); - - assertEquals("{}", json); - } - - /** - * Test lookup of not existing checkpoint in history. - */ - @Test - public void testCheckpointNotFound() throws Exception { - CheckpointStatsHistory history = mock(CheckpointStatsHistory.class); - when(history.getCheckpointById(anyLong())).thenReturn(null); // not found - - CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class); - when(snapshot.getHistory()).thenReturn(history); - - AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - - CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); - Map<String, String> params = new HashMap<>(); - params.put("checkpointid", "123"); - String json = handler.handleRequest(graph, params).get(); - - assertEquals("{}", json); - verify(history, times(1)).getCheckpointById(anyLong()); - } - - /** - * Tests a checkpoint details request for an in progress checkpoint. - */ - @Test - public void testCheckpointDetailsRequestInProgressCheckpoint() throws Exception { - PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class); - when(checkpoint.getCheckpointId()).thenReturn(1992139L); - when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS); - when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint()); - when(checkpoint.getTriggerTimestamp()).thenReturn(1919191900L); - when(checkpoint.getLatestAckTimestamp()).thenReturn(1977791901L); - when(checkpoint.getStateSize()).thenReturn(111939272822L); - when(checkpoint.getEndToEndDuration()).thenReturn(121191L); - when(checkpoint.getAlignmentBuffered()).thenReturn(1L); - when(checkpoint.getNumberOfSubtasks()).thenReturn(501); - when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(101); - - List<TaskStateStats> taskStats = new ArrayList<>(); - TaskStateStats task1 = createTaskStateStats(); - TaskStateStats task2 = createTaskStateStats(); - taskStats.add(task1); - taskStats.add(task2); - - when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats); - - JsonNode rootNode = triggerRequest(checkpoint); - - assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong()); - assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText()); - assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean()); - assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong()); - assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong()); - assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong()); - assertEquals(checkpoint.getEndToEndDuration(), rootNode.get("end_to_end_duration").asLong()); - assertEquals(checkpoint.getAlignmentBuffered(), rootNode.get("alignment_buffered").asLong()); - assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt()); - assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt()); - - verifyTaskNodes(taskStats, rootNode); - } - - /** - * Tests a checkpoint details request for a completed checkpoint. - */ - @Test - public void testCheckpointDetailsRequestCompletedCheckpoint() throws Exception { - CompletedCheckpointStats checkpoint = createCompletedCheckpoint(); - - JsonNode rootNode = triggerRequest(checkpoint); - - compareCompletedCheckpoint(checkpoint, rootNode); - - verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode); - } - - /** - * Tests a checkpoint details request for a failed checkpoint. - */ - @Test - public void testCheckpointDetailsRequestFailedCheckpoint() throws Exception { - FailedCheckpointStats checkpoint = createFailedCheckpoint(); - - JsonNode rootNode = triggerRequest(checkpoint); - - compareFailedCheckpoint(checkpoint, rootNode); - - verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode); - } - - // ------------------------------------------------------------------------ - - private static CompletedCheckpointStats createCompletedCheckpoint() { - CompletedCheckpointStats checkpoint = mock(CompletedCheckpointStats.class); - when(checkpoint.getCheckpointId()).thenReturn(1818213L); - when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED); - when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint()); - when(checkpoint.getTriggerTimestamp()).thenReturn(1818L); - when(checkpoint.getLatestAckTimestamp()).thenReturn(11029222L); - when(checkpoint.getStateSize()).thenReturn(925281L); - when(checkpoint.getEndToEndDuration()).thenReturn(181819L); - when(checkpoint.getAlignmentBuffered()).thenReturn(1010198L); - when(checkpoint.getNumberOfSubtasks()).thenReturn(181271); - when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(29821); - when(checkpoint.isDiscarded()).thenReturn(true); - when(checkpoint.getExternalPath()).thenReturn("checkpoint-external-path"); - - List<TaskStateStats> taskStats = new ArrayList<>(); - TaskStateStats task1 = createTaskStateStats(); - TaskStateStats task2 = createTaskStateStats(); - taskStats.add(task1); - taskStats.add(task2); - - when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats); - - return checkpoint; - } - - private static void compareCompletedCheckpoint(CompletedCheckpointStats checkpoint, JsonNode rootNode) { - assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong()); - assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText()); - assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean()); - assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong()); - assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong()); - assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong()); - assertEquals(checkpoint.getEndToEndDuration(), rootNode.get("end_to_end_duration").asLong()); - assertEquals(checkpoint.getAlignmentBuffered(), rootNode.get("alignment_buffered").asLong()); - assertEquals(checkpoint.isDiscarded(), rootNode.get("discarded").asBoolean()); - assertEquals(checkpoint.getExternalPath(), rootNode.get("external_path").asText()); - assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt()); - assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt()); - } - - private static FailedCheckpointStats createFailedCheckpoint() { - FailedCheckpointStats checkpoint = mock(FailedCheckpointStats.class); - when(checkpoint.getCheckpointId()).thenReturn(1818214L); - when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.FAILED); - when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint()); - when(checkpoint.getTriggerTimestamp()).thenReturn(1818L); - when(checkpoint.getLatestAckTimestamp()).thenReturn(11029222L); - when(checkpoint.getStateSize()).thenReturn(925281L); - when(checkpoint.getEndToEndDuration()).thenReturn(181819L); - when(checkpoint.getAlignmentBuffered()).thenReturn(1010198L); - when(checkpoint.getNumberOfSubtasks()).thenReturn(181271); - when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(29821); - when(checkpoint.getFailureTimestamp()).thenReturn(123012890312093L); - when(checkpoint.getFailureMessage()).thenReturn("failure-message"); - - List<TaskStateStats> taskStats = new ArrayList<>(); - TaskStateStats task1 = createTaskStateStats(); - TaskStateStats task2 = createTaskStateStats(); - taskStats.add(task1); - taskStats.add(task2); - - when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats); - - return checkpoint; - } - - private static void compareFailedCheckpoint(FailedCheckpointStats checkpoint, JsonNode rootNode) { - assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong()); - assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText()); - assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean()); - assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong()); - assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong()); - assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong()); - assertEquals(checkpoint.getEndToEndDuration(), rootNode.get("end_to_end_duration").asLong()); - assertEquals(checkpoint.getAlignmentBuffered(), rootNode.get("alignment_buffered").asLong()); - assertEquals(checkpoint.getFailureTimestamp(), rootNode.get("failure_timestamp").asLong()); - assertEquals(checkpoint.getFailureMessage(), rootNode.get("failure_message").asText()); - assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt()); - assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt()); - } - - private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception { - CheckpointStatsHistory history = mock(CheckpointStatsHistory.class); - when(history.getCheckpointById(anyLong())).thenReturn(checkpoint); - CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class); - when(snapshot.getHistory()).thenReturn(history); - - AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - - CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); - Map<String, String> params = new HashMap<>(); - params.put("checkpointid", "123"); - String json = handler.handleRequest(graph, params).get(); - - ObjectMapper mapper = new ObjectMapper(); - return mapper.readTree(json); - } - - private static void verifyTaskNodes(Collection<TaskStateStats> tasks, JsonNode parentNode) { - for (TaskStateStats task : tasks) { - long duration = ThreadLocalRandom.current().nextInt(128); - - JsonNode taskNode = parentNode.get("tasks").get(task.getJobVertexId().toString()); - assertEquals(task.getLatestAckTimestamp(), taskNode.get("latest_ack_timestamp").asLong()); - assertEquals(task.getStateSize(), taskNode.get("state_size").asLong()); - assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), taskNode.get("end_to_end_duration").asLong()); - assertEquals(task.getAlignmentBuffered(), taskNode.get("alignment_buffered").asLong()); - assertEquals(task.getNumberOfSubtasks(), taskNode.get("num_subtasks").asInt()); - assertEquals(task.getNumberOfAcknowledgedSubtasks(), taskNode.get("num_acknowledged_subtasks").asInt()); - } - } - - private static TaskStateStats createTaskStateStats() { - ThreadLocalRandom rand = ThreadLocalRandom.current(); - - TaskStateStats task = mock(TaskStateStats.class); - when(task.getJobVertexId()).thenReturn(new JobVertexID()); - when(task.getLatestAckTimestamp()).thenReturn(rand.nextLong(1024) + 1); - when(task.getStateSize()).thenReturn(rand.nextLong(1024) + 1); - when(task.getEndToEndDuration(anyLong())).thenReturn(rand.nextLong(1024) + 1); - when(task.getAlignmentBuffered()).thenReturn(rand.nextLong(1024) + 1); - when(task.getNumberOfSubtasks()).thenReturn(rand.nextInt(1024) + 1); - when(task.getNumberOfAcknowledgedSubtasks()).thenReturn(rand.nextInt(1024) + 1); - return task; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java deleted file mode 100644 index ed73a62..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java +++ /dev/null @@ -1,432 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers.checkpoints; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; -import org.apache.flink.runtime.checkpoint.CheckpointProperties; -import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts; -import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; -import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; -import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; -import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; -import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary; -import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; -import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; -import org.apache.flink.runtime.checkpoint.PendingCheckpointStats; -import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats; -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * Tests for the CheckpointStatsHandler. - */ -public class CheckpointStatsHandlerTest { - - @Test - public void testArchiver() throws IOException { - JsonArchivist archivist = new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist(); - TestCheckpointStats testCheckpointStats = createTestCheckpointStats(); - when(testCheckpointStats.graph.getJobID()).thenReturn(new JobID()); - - Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(testCheckpointStats.graph); - Assert.assertEquals(3, archives.size()); - - ObjectMapper mapper = new ObjectMapper(); - - Iterator<ArchivedJson> iterator = archives.iterator(); - ArchivedJson archive1 = iterator.next(); - Assert.assertEquals("/jobs/" + testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + testCheckpointStats.inProgress.getCheckpointId(), archive1.getPath()); - compareInProgressCheckpoint(testCheckpointStats.inProgress, mapper.readTree(archive1.getJson())); - - ArchivedJson archive2 = iterator.next(); - Assert.assertEquals("/jobs/" + testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + testCheckpointStats.completedSavepoint.getCheckpointId(), archive2.getPath()); - compareCompletedSavepoint(testCheckpointStats.completedSavepoint, mapper.readTree(archive2.getJson())); - - ArchivedJson archive3 = iterator.next(); - Assert.assertEquals("/jobs/" + testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + testCheckpointStats.failed.getCheckpointId(), archive3.getPath()); - compareFailedCheckpoint(testCheckpointStats.failed, mapper.readTree(archive3.getJson())); - } - - @Test - public void testGetPaths() { - CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/checkpoints", paths[0]); - } - - /** - * Tests a complete checkpoint stats snapshot. - */ - @Test - public void testCheckpointStatsRequest() throws Exception { - TestCheckpointStats testCheckpointStats = createTestCheckpointStats(); - - CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); - String json = handler.handleRequest(testCheckpointStats.graph, Collections.<String, String>emptyMap()).get(); - - ObjectMapper mapper = new ObjectMapper(); - JsonNode rootNode = mapper.readTree(json); - - compareCheckpointStats(testCheckpointStats, rootNode); - } - - private static TestCheckpointStats createTestCheckpointStats() { - // Counts - CheckpointStatsCounts counts = mock(CheckpointStatsCounts.class); - when(counts.getNumberOfRestoredCheckpoints()).thenReturn(123123123L); - when(counts.getTotalNumberOfCheckpoints()).thenReturn(12981231203L); - when(counts.getNumberOfInProgressCheckpoints()).thenReturn(191919); - when(counts.getNumberOfCompletedCheckpoints()).thenReturn(882828200L); - when(counts.getNumberOfFailedCheckpoints()).thenReturn(99171510L); - - // Summary - CompletedCheckpointStatsSummary summary = mock(CompletedCheckpointStatsSummary.class); - - MinMaxAvgStats stateSizeSummary = mock(MinMaxAvgStats.class); - when(stateSizeSummary.getMinimum()).thenReturn(81238123L); - when(stateSizeSummary.getMaximum()).thenReturn(19919191999L); - when(stateSizeSummary.getAverage()).thenReturn(1133L); - - MinMaxAvgStats durationSummary = mock(MinMaxAvgStats.class); - when(durationSummary.getMinimum()).thenReturn(1182L); - when(durationSummary.getMaximum()).thenReturn(88654L); - when(durationSummary.getAverage()).thenReturn(171L); - - MinMaxAvgStats alignmentBufferedSummary = mock(MinMaxAvgStats.class); - when(alignmentBufferedSummary.getMinimum()).thenReturn(81818181899L); - when(alignmentBufferedSummary.getMaximum()).thenReturn(89999911118654L); - when(alignmentBufferedSummary.getAverage()).thenReturn(11203131L); - - when(summary.getStateSizeStats()).thenReturn(stateSizeSummary); - when(summary.getEndToEndDurationStats()).thenReturn(durationSummary); - when(summary.getAlignmentBufferedStats()).thenReturn(alignmentBufferedSummary); - - // Latest - CompletedCheckpointStats latestCompleted = mock(CompletedCheckpointStats.class); - when(latestCompleted.getCheckpointId()).thenReturn(1992139L); - when(latestCompleted.getTriggerTimestamp()).thenReturn(1919191900L); - when(latestCompleted.getLatestAckTimestamp()).thenReturn(1977791901L); - when(latestCompleted.getStateSize()).thenReturn(111939272822L); - when(latestCompleted.getEndToEndDuration()).thenReturn(121191L); - when(latestCompleted.getAlignmentBuffered()).thenReturn(1L); - when(latestCompleted.getExternalPath()).thenReturn("latest-completed-external-path"); - - CompletedCheckpointStats latestSavepoint = mock(CompletedCheckpointStats.class); - when(latestSavepoint.getCheckpointId()).thenReturn(1992140L); - when(latestSavepoint.getTriggerTimestamp()).thenReturn(1919191900L); - when(latestSavepoint.getLatestAckTimestamp()).thenReturn(1977791901L); - when(latestSavepoint.getStateSize()).thenReturn(111939272822L); - when(latestSavepoint.getEndToEndDuration()).thenReturn(121191L); - when(latestCompleted.getAlignmentBuffered()).thenReturn(182813L); - when(latestSavepoint.getExternalPath()).thenReturn("savepoint-external-path"); - - FailedCheckpointStats latestFailed = mock(FailedCheckpointStats.class); - when(latestFailed.getCheckpointId()).thenReturn(1112L); - when(latestFailed.getTriggerTimestamp()).thenReturn(12828L); - when(latestFailed.getLatestAckTimestamp()).thenReturn(1901L); - when(latestFailed.getFailureTimestamp()).thenReturn(11999976L); - when(latestFailed.getStateSize()).thenReturn(111L); - when(latestFailed.getEndToEndDuration()).thenReturn(12L); - when(latestFailed.getAlignmentBuffered()).thenReturn(2L); - when(latestFailed.getFailureMessage()).thenReturn("expected cause"); - - RestoredCheckpointStats latestRestored = mock(RestoredCheckpointStats.class); - when(latestRestored.getCheckpointId()).thenReturn(1199L); - when(latestRestored.getRestoreTimestamp()).thenReturn(434242L); - when(latestRestored.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint()); - when(latestRestored.getExternalPath()).thenReturn("restored savepoint path"); - - // History - CheckpointStatsHistory history = mock(CheckpointStatsHistory.class); - List<AbstractCheckpointStats> checkpoints = new ArrayList<>(); - - PendingCheckpointStats inProgress = mock(PendingCheckpointStats.class); - when(inProgress.getCheckpointId()).thenReturn(1992141L); - when(inProgress.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS); - when(inProgress.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint()); - when(inProgress.getTriggerTimestamp()).thenReturn(1919191900L); - when(inProgress.getLatestAckTimestamp()).thenReturn(1977791901L); - when(inProgress.getStateSize()).thenReturn(111939272822L); - when(inProgress.getEndToEndDuration()).thenReturn(121191L); - when(inProgress.getAlignmentBuffered()).thenReturn(1L); - when(inProgress.getNumberOfSubtasks()).thenReturn(501); - when(inProgress.getNumberOfAcknowledgedSubtasks()).thenReturn(101); - - CompletedCheckpointStats completedSavepoint = mock(CompletedCheckpointStats.class); - when(completedSavepoint.getCheckpointId()).thenReturn(1322139L); - when(completedSavepoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED); - when(completedSavepoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint()); - when(completedSavepoint.getTriggerTimestamp()).thenReturn(191900L); - when(completedSavepoint.getLatestAckTimestamp()).thenReturn(197791901L); - when(completedSavepoint.getStateSize()).thenReturn(1119822L); - when(completedSavepoint.getEndToEndDuration()).thenReturn(12191L); - when(completedSavepoint.getAlignmentBuffered()).thenReturn(111L); - when(completedSavepoint.getNumberOfSubtasks()).thenReturn(33501); - when(completedSavepoint.getNumberOfAcknowledgedSubtasks()).thenReturn(211); - when(completedSavepoint.isDiscarded()).thenReturn(true); - when(completedSavepoint.getExternalPath()).thenReturn("completed-external-path"); - - FailedCheckpointStats failed = mock(FailedCheckpointStats.class); - when(failed.getCheckpointId()).thenReturn(110719L); - when(failed.getStatus()).thenReturn(CheckpointStatsStatus.FAILED); - when(failed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint()); - when(failed.getTriggerTimestamp()).thenReturn(191900L); - when(failed.getLatestAckTimestamp()).thenReturn(197791901L); - when(failed.getStateSize()).thenReturn(1119822L); - when(failed.getEndToEndDuration()).thenReturn(12191L); - when(failed.getAlignmentBuffered()).thenReturn(111L); - when(failed.getNumberOfSubtasks()).thenReturn(33501); - when(failed.getNumberOfAcknowledgedSubtasks()).thenReturn(1); - when(failed.getFailureTimestamp()).thenReturn(119230L); - when(failed.getFailureMessage()).thenReturn("failure message"); - - checkpoints.add(inProgress); - checkpoints.add(completedSavepoint); - checkpoints.add(failed); - when(history.getCheckpoints()).thenReturn(checkpoints); - when(history.getLatestCompletedCheckpoint()).thenReturn(latestCompleted); - when(history.getLatestSavepoint()).thenReturn(latestSavepoint); - when(history.getLatestFailedCheckpoint()).thenReturn(latestFailed); - - CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class); - when(snapshot.getCounts()).thenReturn(counts); - when(snapshot.getSummaryStats()).thenReturn(summary); - when(snapshot.getHistory()).thenReturn(history); - when(snapshot.getLatestRestoredCheckpoint()).thenReturn(latestRestored); - - AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - - return new TestCheckpointStats( - graph, counts, stateSizeSummary, durationSummary, alignmentBufferedSummary, summary, - latestCompleted, latestSavepoint, latestFailed, latestRestored, inProgress, - completedSavepoint, failed, history, snapshot - ); - } - - private static void compareCheckpointStats(TestCheckpointStats checkpointStats, JsonNode rootNode) { - CheckpointStatsCounts counts = checkpointStats.counts; - JsonNode countNode = rootNode.get("counts"); - assertEquals(counts.getNumberOfRestoredCheckpoints(), countNode.get("restored").asLong()); - assertEquals(counts.getTotalNumberOfCheckpoints(), countNode.get("total").asLong()); - assertEquals(counts.getNumberOfInProgressCheckpoints(), countNode.get("in_progress").asLong()); - assertEquals(counts.getNumberOfCompletedCheckpoints(), countNode.get("completed").asLong()); - assertEquals(counts.getNumberOfFailedCheckpoints(), countNode.get("failed").asLong()); - - MinMaxAvgStats stateSizeSummary = checkpointStats.stateSizeSummary; - JsonNode summaryNode = rootNode.get("summary"); - JsonNode sizeSummaryNode = summaryNode.get("state_size"); - assertEquals(stateSizeSummary.getMinimum(), sizeSummaryNode.get("min").asLong()); - assertEquals(stateSizeSummary.getMaximum(), sizeSummaryNode.get("max").asLong()); - assertEquals(stateSizeSummary.getAverage(), sizeSummaryNode.get("avg").asLong()); - - MinMaxAvgStats durationSummary = checkpointStats.durationSummary; - JsonNode durationSummaryNode = summaryNode.get("end_to_end_duration"); - assertEquals(durationSummary.getMinimum(), durationSummaryNode.get("min").asLong()); - assertEquals(durationSummary.getMaximum(), durationSummaryNode.get("max").asLong()); - assertEquals(durationSummary.getAverage(), durationSummaryNode.get("avg").asLong()); - - MinMaxAvgStats alignmentBufferedSummary = checkpointStats.alignmentBufferedSummary; - JsonNode alignmentBufferedNode = summaryNode.get("alignment_buffered"); - assertEquals(alignmentBufferedSummary.getMinimum(), alignmentBufferedNode.get("min").asLong()); - assertEquals(alignmentBufferedSummary.getMaximum(), alignmentBufferedNode.get("max").asLong()); - assertEquals(alignmentBufferedSummary.getAverage(), alignmentBufferedNode.get("avg").asLong()); - - CompletedCheckpointStats latestCompleted = checkpointStats.latestCompleted; - JsonNode latestNode = rootNode.get("latest"); - JsonNode latestCheckpointNode = latestNode.get("completed"); - assertEquals(latestCompleted.getCheckpointId(), latestCheckpointNode.get("id").asLong()); - assertEquals(latestCompleted.getTriggerTimestamp(), latestCheckpointNode.get("trigger_timestamp").asLong()); - assertEquals(latestCompleted.getLatestAckTimestamp(), latestCheckpointNode.get("latest_ack_timestamp").asLong()); - assertEquals(latestCompleted.getStateSize(), latestCheckpointNode.get("state_size").asLong()); - assertEquals(latestCompleted.getEndToEndDuration(), latestCheckpointNode.get("end_to_end_duration").asLong()); - assertEquals(latestCompleted.getAlignmentBuffered(), latestCheckpointNode.get("alignment_buffered").asLong()); - assertEquals(latestCompleted.getExternalPath(), latestCheckpointNode.get("external_path").asText()); - - CompletedCheckpointStats latestSavepoint = checkpointStats.latestSavepoint; - JsonNode latestSavepointNode = latestNode.get("savepoint"); - assertEquals(latestSavepoint.getCheckpointId(), latestSavepointNode.get("id").asLong()); - assertEquals(latestSavepoint.getTriggerTimestamp(), latestSavepointNode.get("trigger_timestamp").asLong()); - assertEquals(latestSavepoint.getLatestAckTimestamp(), latestSavepointNode.get("latest_ack_timestamp").asLong()); - assertEquals(latestSavepoint.getStateSize(), latestSavepointNode.get("state_size").asLong()); - assertEquals(latestSavepoint.getEndToEndDuration(), latestSavepointNode.get("end_to_end_duration").asLong()); - assertEquals(latestSavepoint.getAlignmentBuffered(), latestSavepointNode.get("alignment_buffered").asLong()); - assertEquals(latestSavepoint.getExternalPath(), latestSavepointNode.get("external_path").asText()); - - FailedCheckpointStats latestFailed = checkpointStats.latestFailed; - JsonNode latestFailedNode = latestNode.get("failed"); - assertEquals(latestFailed.getCheckpointId(), latestFailedNode.get("id").asLong()); - assertEquals(latestFailed.getTriggerTimestamp(), latestFailedNode.get("trigger_timestamp").asLong()); - assertEquals(latestFailed.getLatestAckTimestamp(), latestFailedNode.get("latest_ack_timestamp").asLong()); - assertEquals(latestFailed.getStateSize(), latestFailedNode.get("state_size").asLong()); - assertEquals(latestFailed.getEndToEndDuration(), latestFailedNode.get("end_to_end_duration").asLong()); - assertEquals(latestFailed.getAlignmentBuffered(), latestFailedNode.get("alignment_buffered").asLong()); - assertEquals(latestFailed.getFailureTimestamp(), latestFailedNode.get("failure_timestamp").asLong()); - assertEquals(latestFailed.getFailureMessage(), latestFailedNode.get("failure_message").asText()); - - RestoredCheckpointStats latestRestored = checkpointStats.latestRestored; - JsonNode latestRestoredNode = latestNode.get("restored"); - assertEquals(latestRestored.getCheckpointId(), latestRestoredNode.get("id").asLong()); - assertEquals(latestRestored.getRestoreTimestamp(), latestRestoredNode.get("restore_timestamp").asLong()); - assertEquals(latestRestored.getProperties().isSavepoint(), latestRestoredNode.get("is_savepoint").asBoolean()); - assertEquals(latestRestored.getExternalPath(), latestRestoredNode.get("external_path").asText()); - - JsonNode historyNode = rootNode.get("history"); - Iterator<JsonNode> it = historyNode.iterator(); - - assertTrue(it.hasNext()); - JsonNode inProgressNode = it.next(); - - PendingCheckpointStats inProgress = checkpointStats.inProgress; - compareInProgressCheckpoint(inProgress, inProgressNode); - - assertTrue(it.hasNext()); - JsonNode completedSavepointNode = it.next(); - - CompletedCheckpointStats completedSavepoint = checkpointStats.completedSavepoint; - compareCompletedSavepoint(completedSavepoint, completedSavepointNode); - - assertTrue(it.hasNext()); - JsonNode failedNode = it.next(); - - FailedCheckpointStats failed = checkpointStats.failed; - compareFailedCheckpoint(failed, failedNode); - - assertFalse(it.hasNext()); - } - - private static void compareInProgressCheckpoint(PendingCheckpointStats inProgress, JsonNode inProgressNode) { - assertEquals(inProgress.getCheckpointId(), inProgressNode.get("id").asLong()); - assertEquals(inProgress.getStatus().toString(), inProgressNode.get("status").asText()); - assertEquals(inProgress.getProperties().isSavepoint(), inProgressNode.get("is_savepoint").asBoolean()); - assertEquals(inProgress.getTriggerTimestamp(), inProgressNode.get("trigger_timestamp").asLong()); - assertEquals(inProgress.getLatestAckTimestamp(), inProgressNode.get("latest_ack_timestamp").asLong()); - assertEquals(inProgress.getStateSize(), inProgressNode.get("state_size").asLong()); - assertEquals(inProgress.getEndToEndDuration(), inProgressNode.get("end_to_end_duration").asLong()); - assertEquals(inProgress.getAlignmentBuffered(), inProgressNode.get("alignment_buffered").asLong()); - assertEquals(inProgress.getNumberOfSubtasks(), inProgressNode.get("num_subtasks").asInt()); - assertEquals(inProgress.getNumberOfAcknowledgedSubtasks(), inProgressNode.get("num_acknowledged_subtasks").asInt()); - } - - private static void compareCompletedSavepoint(CompletedCheckpointStats completedSavepoint, JsonNode completedSavepointNode) { - assertEquals(completedSavepoint.getCheckpointId(), completedSavepointNode.get("id").asLong()); - assertEquals(completedSavepoint.getStatus().toString(), completedSavepointNode.get("status").asText()); - assertEquals(completedSavepoint.getProperties().isSavepoint(), completedSavepointNode.get("is_savepoint").asBoolean()); - assertEquals(completedSavepoint.getTriggerTimestamp(), completedSavepointNode.get("trigger_timestamp").asLong()); - assertEquals(completedSavepoint.getLatestAckTimestamp(), completedSavepointNode.get("latest_ack_timestamp").asLong()); - assertEquals(completedSavepoint.getStateSize(), completedSavepointNode.get("state_size").asLong()); - assertEquals(completedSavepoint.getEndToEndDuration(), completedSavepointNode.get("end_to_end_duration").asLong()); - assertEquals(completedSavepoint.getAlignmentBuffered(), completedSavepointNode.get("alignment_buffered").asLong()); - assertEquals(completedSavepoint.getNumberOfSubtasks(), completedSavepointNode.get("num_subtasks").asInt()); - assertEquals(completedSavepoint.getNumberOfAcknowledgedSubtasks(), completedSavepointNode.get("num_acknowledged_subtasks").asInt()); - - assertEquals(completedSavepoint.getExternalPath(), completedSavepointNode.get("external_path").asText()); - assertEquals(completedSavepoint.isDiscarded(), completedSavepointNode.get("discarded").asBoolean()); - } - - private static void compareFailedCheckpoint(FailedCheckpointStats failed, JsonNode failedNode) { - assertEquals(failed.getCheckpointId(), failedNode.get("id").asLong()); - assertEquals(failed.getStatus().toString(), failedNode.get("status").asText()); - assertEquals(failed.getProperties().isSavepoint(), failedNode.get("is_savepoint").asBoolean()); - assertEquals(failed.getTriggerTimestamp(), failedNode.get("trigger_timestamp").asLong()); - assertEquals(failed.getLatestAckTimestamp(), failedNode.get("latest_ack_timestamp").asLong()); - assertEquals(failed.getStateSize(), failedNode.get("state_size").asLong()); - assertEquals(failed.getEndToEndDuration(), failedNode.get("end_to_end_duration").asLong()); - assertEquals(failed.getAlignmentBuffered(), failedNode.get("alignment_buffered").asLong()); - assertEquals(failed.getNumberOfSubtasks(), failedNode.get("num_subtasks").asInt()); - assertEquals(failed.getNumberOfAcknowledgedSubtasks(), failedNode.get("num_acknowledged_subtasks").asInt()); - - assertEquals(failed.getFailureTimestamp(), failedNode.get("failure_timestamp").asLong()); - assertEquals(failed.getFailureMessage(), failedNode.get("failure_message").asText()); - } - - private static class TestCheckpointStats { - public final AccessExecutionGraph graph; - public final CheckpointStatsCounts counts; - public final MinMaxAvgStats stateSizeSummary; - public final MinMaxAvgStats durationSummary; - public final MinMaxAvgStats alignmentBufferedSummary; - public final CompletedCheckpointStatsSummary summary; - public final CompletedCheckpointStats latestCompleted; - public final CompletedCheckpointStats latestSavepoint; - public final FailedCheckpointStats latestFailed; - public final RestoredCheckpointStats latestRestored; - public final PendingCheckpointStats inProgress; - public final CompletedCheckpointStats completedSavepoint; - public final FailedCheckpointStats failed; - public final CheckpointStatsHistory history; - public final CheckpointStatsSnapshot snapshot; - - public TestCheckpointStats( - AccessExecutionGraph graph, - CheckpointStatsCounts counts, - MinMaxAvgStats stateSizeSummary, - MinMaxAvgStats durationSummary, - MinMaxAvgStats alignmentBufferedSummary, - CompletedCheckpointStatsSummary summary, - CompletedCheckpointStats latestCompleted, - CompletedCheckpointStats latestSavepoint, - FailedCheckpointStats latestFailed, - RestoredCheckpointStats latestRestored, - PendingCheckpointStats inProgress, - CompletedCheckpointStats completedSavepoint, - FailedCheckpointStats failed, - CheckpointStatsHistory history, - CheckpointStatsSnapshot snapshot) { - this.graph = graph; - this.counts = counts; - this.stateSizeSummary = stateSizeSummary; - this.durationSummary = durationSummary; - this.alignmentBufferedSummary = alignmentBufferedSummary; - this.summary = summary; - this.latestCompleted = latestCompleted; - this.latestSavepoint = latestSavepoint; - this.latestFailed = latestFailed; - this.latestRestored = latestRestored; - this.inProgress = inProgress; - this.completedSavepoint = completedSavepoint; - this.failed = failed; - this.history = history; - this.snapshot = snapshot; - } - } -}
