Repository: flink Updated Branches: refs/heads/master 243ef69bf -> 7fe0eb477
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java index 939f439..0076d42 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java @@ -21,12 +21,34 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.util.Collection; + public class SubtasksTimesHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new SubtasksTimesHandler.SubtasksTimesJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/subtasktimes", archive.getPath()); + compareSubtaskTimes(originalTask, originalAttempt, archive.getJson()); + } + @Test public void testGetPaths() { SubtasksTimesHandler handler = new SubtasksTimesHandler(null); @@ -41,6 +63,10 @@ public class SubtasksTimesHandlerTest { AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); String json = SubtasksTimesHandler.createSubtaskTimesJson(originalTask); + compareSubtaskTimes(originalTask, originalAttempt, json); + } + + private static void compareSubtaskTimes(AccessExecutionJobVertex originalTask, AccessExecution originalAttempt, String json) throws IOException { JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java index e570e18..9d339f5 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java @@ -20,14 +20,20 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.util.Collection; import java.util.Collections; import static org.junit.Assert.assertEquals; @@ -38,6 +44,37 @@ import static org.mockito.Mockito.when; public class CheckpointConfigHandlerTest { @Test + public void testArchiver() throws IOException { + JsonArchivist archivist = new CheckpointConfigHandler.CheckpointConfigJsonArchivist(); + GraphAndSettings graphAndSettings = createGraphAndSettings(true, true); + + AccessExecutionGraph graph = graphAndSettings.graph; + when(graph.getJobID()).thenReturn(new JobID()); + JobSnapshottingSettings settings = graphAndSettings.snapshottingSettings; + ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings; + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph); + Assert.assertEquals(1, archives.size()); + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + graph.getJobID() + "/checkpoints/config", archive.getPath()); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(archive.getJson()); + + Assert.assertEquals("exactly_once", rootNode.get("mode").asText()); + Assert.assertEquals(settings.getCheckpointInterval(), rootNode.get("interval").asLong()); + Assert.assertEquals(settings.getCheckpointTimeout(), rootNode.get("timeout").asLong()); + Assert.assertEquals(settings.getMinPauseBetweenCheckpoints(), rootNode.get("min_pause").asLong()); + Assert.assertEquals(settings.getMaxConcurrentCheckpoints(), rootNode.get("max_concurrent").asInt()); + + JsonNode externalizedNode = rootNode.get("externalization"); + Assert.assertNotNull(externalizedNode); + Assert.assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean()); + Assert.assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean()); + + } + + @Test public void testGetPaths() { CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class)); String[] paths = handler.getPaths(); @@ -50,26 +87,10 @@ public class CheckpointConfigHandlerTest { */ @Test public void testSimpleConfig() throws Exception { - long interval = 18231823L; - long timeout = 996979L; - long minPause = 119191919L; - int maxConcurrent = 12929329; - ExternalizedCheckpointSettings externalized = ExternalizedCheckpointSettings.none(); + GraphAndSettings graphAndSettings = createGraphAndSettings(false, true); - JobSnapshottingSettings settings = new JobSnapshottingSettings( - Collections.<JobVertexID>emptyList(), - Collections.<JobVertexID>emptyList(), - Collections.<JobVertexID>emptyList(), - interval, - timeout, - minPause, - maxConcurrent, - externalized, - null, - true); - - AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - when(graph.getJobSnapshottingSettings()).thenReturn(settings); + AccessExecutionGraph graph = graphAndSettings.graph; + JobSnapshottingSettings settings = graphAndSettings.snapshottingSettings; CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class)); String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()); @@ -78,10 +99,10 @@ public class CheckpointConfigHandlerTest { JsonNode rootNode = mapper.readTree(json); assertEquals("exactly_once", rootNode.get("mode").asText()); - assertEquals(interval, rootNode.get("interval").asLong()); - assertEquals(timeout, rootNode.get("timeout").asLong()); - assertEquals(minPause, rootNode.get("min_pause").asLong()); - assertEquals(maxConcurrent, rootNode.get("max_concurrent").asInt()); + assertEquals(settings.getCheckpointInterval(), rootNode.get("interval").asLong()); + assertEquals(settings.getCheckpointTimeout(), rootNode.get("timeout").asLong()); + assertEquals(settings.getMinPauseBetweenCheckpoints(), rootNode.get("min_pause").asLong()); + assertEquals(settings.getMaxConcurrentCheckpoints(), rootNode.get("max_concurrent").asInt()); JsonNode externalizedNode = rootNode.get("externalization"); assertNotNull(externalizedNode); @@ -93,20 +114,9 @@ public class CheckpointConfigHandlerTest { */ @Test public void testAtLeastOnce() throws Exception { - JobSnapshottingSettings settings = new JobSnapshottingSettings( - Collections.<JobVertexID>emptyList(), - Collections.<JobVertexID>emptyList(), - Collections.<JobVertexID>emptyList(), - 996979L, - 1818L, - 1212L, - 12, - ExternalizedCheckpointSettings.none(), - null, - false); // at least once + GraphAndSettings graphAndSettings = createGraphAndSettings(false, false); - AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - when(graph.getJobSnapshottingSettings()).thenReturn(settings); + AccessExecutionGraph graph = graphAndSettings.graph; CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class)); String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()); @@ -122,30 +132,60 @@ public class CheckpointConfigHandlerTest { */ @Test public void testEnabledExternalizedCheckpointSettings() throws Exception { - ExternalizedCheckpointSettings externalizedSettings = ExternalizedCheckpointSettings.externalizeCheckpoints(true); + GraphAndSettings graphAndSettings = createGraphAndSettings(true, false); + + AccessExecutionGraph graph = graphAndSettings.graph; + ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings; + + CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class)); + String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode externalizedNode = mapper.readTree(json).get("externalization"); + assertNotNull(externalizedNode); + assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean()); + assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean()); + } + + private static GraphAndSettings createGraphAndSettings(boolean externalized, boolean exactlyOnce) { + long interval = 18231823L; + long timeout = 996979L; + long minPause = 119191919L; + int maxConcurrent = 12929329; + ExternalizedCheckpointSettings externalizedSetting = externalized + ? ExternalizedCheckpointSettings.externalizeCheckpoints(true) + : ExternalizedCheckpointSettings.none(); JobSnapshottingSettings settings = new JobSnapshottingSettings( Collections.<JobVertexID>emptyList(), Collections.<JobVertexID>emptyList(), Collections.<JobVertexID>emptyList(), - 996979L, - 1818L, - 1212L, - 12, - externalizedSettings, + interval, + timeout, + minPause, + maxConcurrent, + externalizedSetting, null, - false); // at least once + exactlyOnce); AccessExecutionGraph graph = mock(AccessExecutionGraph.class); when(graph.getJobSnapshottingSettings()).thenReturn(settings); - CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class)); - String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()); + return new GraphAndSettings(graph, settings, externalizedSetting); + } - ObjectMapper mapper = new ObjectMapper(); - JsonNode externalizedNode = mapper.readTree(json).get("externalization"); - assertNotNull(externalizedNode); - assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean()); - assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean()); + private static class GraphAndSettings { + public final AccessExecutionGraph graph; + public final JobSnapshottingSettings snapshottingSettings; + public final ExternalizedCheckpointSettings externalizedSettings; + + public GraphAndSettings( + AccessExecutionGraph graph, + JobSnapshottingSettings snapshottingSettings, + ExternalizedCheckpointSettings externalizedSettings) { + this.graph = graph; + this.snapshottingSettings = snapshottingSettings; + this.externalizedSettings = externalizedSettings; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java index ca9b606..770b032 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; import org.apache.flink.runtime.checkpoint.CheckpointProperties; import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; @@ -32,12 +33,19 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + + import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -50,6 +58,44 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class CheckpointStatsDetailsHandlerTest { + + @Test + public void testArchiver() throws IOException { + JsonArchivist archivist = new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist(); + + CompletedCheckpointStats completedCheckpoint = createCompletedCheckpoint(); + FailedCheckpointStats failedCheckpoint = createFailedCheckpoint(); + List<AbstractCheckpointStats> checkpoints = new ArrayList<>(); + checkpoints.add(failedCheckpoint); + checkpoints.add(completedCheckpoint); + + CheckpointStatsHistory history = mock(CheckpointStatsHistory.class); + when(history.getCheckpoints()).thenReturn(checkpoints); + CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class); + when(snapshot.getHistory()).thenReturn(history); + + AccessExecutionGraph graph = mock(AccessExecutionGraph.class); + when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); + when(graph.getJobID()).thenReturn(new JobID()); + + ObjectMapper mapper = new ObjectMapper(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph); + Assert.assertEquals(2, archives.size()); + + Iterator<ArchivedJson> iterator = archives.iterator(); + ArchivedJson archive1 = iterator.next(); + Assert.assertEquals( + "/jobs/" + graph.getJobID() + "/checkpoints/details/" + failedCheckpoint.getCheckpointId(), + archive1.getPath()); + compareFailedCheckpoint(failedCheckpoint, mapper.readTree(archive1.getJson())); + + ArchivedJson archive2 = iterator.next(); + Assert.assertEquals( + "/jobs/" + graph.getJobID() + "/checkpoints/details/" + completedCheckpoint.getCheckpointId(), + archive2.getPath()); + compareCompletedCheckpoint(completedCheckpoint, mapper.readTree(archive2.getJson())); + } @Test public void testGetPaths() { @@ -146,8 +192,7 @@ public class CheckpointStatsDetailsHandlerTest { assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt()); assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt()); - verifyTaskNode(task1, rootNode); - verifyTaskNode(task2, rootNode); + verifyTaskNodes(taskStats, rootNode); } /** @@ -155,6 +200,32 @@ public class CheckpointStatsDetailsHandlerTest { */ @Test public void testCheckpointDetailsRequestCompletedCheckpoint() throws Exception { + CompletedCheckpointStats checkpoint = createCompletedCheckpoint(); + + JsonNode rootNode = triggerRequest(checkpoint); + + compareCompletedCheckpoint(checkpoint, rootNode); + + verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode); + } + + /** + * Tests a checkpoint details request for a failed checkpoint. + */ + @Test + public void testCheckpointDetailsRequestFailedCheckpoint() throws Exception { + FailedCheckpointStats checkpoint = createFailedCheckpoint(); + + JsonNode rootNode = triggerRequest(checkpoint); + + compareFailedCheckpoint(checkpoint, rootNode); + + verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode); + } + + // ------------------------------------------------------------------------ + + private static CompletedCheckpointStats createCompletedCheckpoint() { CompletedCheckpointStats checkpoint = mock(CompletedCheckpointStats.class); when(checkpoint.getCheckpointId()).thenReturn(1818213L); when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED); @@ -177,8 +248,10 @@ public class CheckpointStatsDetailsHandlerTest { when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats); - JsonNode rootNode = triggerRequest(checkpoint); + return checkpoint; + } + private static void compareCompletedCheckpoint(CompletedCheckpointStats checkpoint, JsonNode rootNode) { assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong()); assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText()); assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean()); @@ -191,18 +264,11 @@ public class CheckpointStatsDetailsHandlerTest { assertEquals(checkpoint.getExternalPath(), rootNode.get("external_path").asText()); assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt()); assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt()); - - verifyTaskNode(task1, rootNode); - verifyTaskNode(task2, rootNode); } - /** - * Tests a checkpoint details request for a failed checkpoint. - */ - @Test - public void testCheckpointDetailsRequestFailedCheckpoint() throws Exception { + private static FailedCheckpointStats createFailedCheckpoint() { FailedCheckpointStats checkpoint = mock(FailedCheckpointStats.class); - when(checkpoint.getCheckpointId()).thenReturn(1818213L); + when(checkpoint.getCheckpointId()).thenReturn(1818214L); when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.FAILED); when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint()); when(checkpoint.getTriggerTimestamp()).thenReturn(1818L); @@ -223,8 +289,10 @@ public class CheckpointStatsDetailsHandlerTest { when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats); - JsonNode rootNode = triggerRequest(checkpoint); + return checkpoint; + } + private static void compareFailedCheckpoint(FailedCheckpointStats checkpoint, JsonNode rootNode) { assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong()); assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText()); assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean()); @@ -237,13 +305,8 @@ public class CheckpointStatsDetailsHandlerTest { assertEquals(checkpoint.getFailureMessage(), rootNode.get("failure_message").asText()); assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt()); assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt()); - - verifyTaskNode(task1, rootNode); - verifyTaskNode(task2, rootNode); } - // ------------------------------------------------------------------------ - private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception { CheckpointStatsHistory history = mock(CheckpointStatsHistory.class); when(history.getCheckpointById(anyLong())).thenReturn(checkpoint); @@ -262,16 +325,18 @@ public class CheckpointStatsDetailsHandlerTest { return mapper.readTree(json); } - private static void verifyTaskNode(TaskStateStats task, JsonNode parentNode) { - long duration = ThreadLocalRandom.current().nextInt(128); - - JsonNode taskNode = parentNode.get("tasks").get(task.getJobVertexId().toString()); - assertEquals(task.getLatestAckTimestamp(), taskNode.get("latest_ack_timestamp").asLong()); - assertEquals(task.getStateSize(), taskNode.get("state_size").asLong()); - assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), taskNode.get("end_to_end_duration").asLong()); - assertEquals(task.getAlignmentBuffered(), taskNode.get("alignment_buffered").asLong()); - assertEquals(task.getNumberOfSubtasks(), taskNode.get("num_subtasks").asInt()); - assertEquals(task.getNumberOfAcknowledgedSubtasks(), taskNode.get("num_acknowledged_subtasks").asInt()); + private static void verifyTaskNodes(Collection<TaskStateStats> tasks, JsonNode parentNode) { + for (TaskStateStats task : tasks) { + long duration = ThreadLocalRandom.current().nextInt(128); + + JsonNode taskNode = parentNode.get("tasks").get(task.getJobVertexId().toString()); + assertEquals(task.getLatestAckTimestamp(), taskNode.get("latest_ack_timestamp").asLong()); + assertEquals(task.getStateSize(), taskNode.get("state_size").asLong()); + assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), taskNode.get("end_to_end_duration").asLong()); + assertEquals(task.getAlignmentBuffered(), taskNode.get("alignment_buffered").asLong()); + assertEquals(task.getNumberOfSubtasks(), taskNode.get("num_subtasks").asInt()); + assertEquals(task.getNumberOfAcknowledgedSubtasks(), taskNode.get("num_acknowledged_subtasks").asInt()); + } } private static TaskStateStats createTaskStateStats() { http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java index ab7c7a3..1e4a255 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; import org.apache.flink.runtime.checkpoint.CheckpointProperties; import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts; @@ -34,10 +35,15 @@ import org.apache.flink.runtime.checkpoint.PendingCheckpointStats; import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -51,6 +57,32 @@ import static org.mockito.Mockito.when; public class CheckpointStatsHandlerTest { @Test + public void testArchiver() throws IOException { + JsonArchivist archivist = new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist(); + TestCheckpointStats testCheckpointStats = createTestCheckpointStats(); + when(testCheckpointStats.graph.getJobID()).thenReturn(new JobID()); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(testCheckpointStats.graph); + Assert.assertEquals(3, archives.size()); + + ObjectMapper mapper = new ObjectMapper(); + + Iterator<ArchivedJson> iterator = archives.iterator(); + ArchivedJson archive1 = iterator.next(); + Assert.assertEquals("/jobs/" + testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + testCheckpointStats.inProgress.getCheckpointId(), archive1.getPath()); + compareInProgressCheckpoint(testCheckpointStats.inProgress, mapper.readTree(archive1.getJson())); + + ArchivedJson archive2 = iterator.next(); + Assert.assertEquals("/jobs/" + testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + testCheckpointStats.completedSavepoint.getCheckpointId(), archive2.getPath()); + compareCompletedSavepoint(testCheckpointStats.completedSavepoint, mapper.readTree(archive2.getJson())); + + ArchivedJson archive3 = iterator.next(); + Assert.assertEquals("/jobs/" + testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + testCheckpointStats.failed.getCheckpointId(), archive3.getPath()); + compareFailedCheckpoint(testCheckpointStats.failed, mapper.readTree(archive3.getJson())); + } + + + @Test public void testGetPaths() { CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class)); String[] paths = handler.getPaths(); @@ -63,6 +95,18 @@ public class CheckpointStatsHandlerTest { */ @Test public void testCheckpointStatsRequest() throws Exception { + TestCheckpointStats testCheckpointStats = createTestCheckpointStats(); + + CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class)); + String json = handler.handleRequest(testCheckpointStats.graph, Collections.<String, String>emptyMap()); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(json); + + compareCheckpointStats(testCheckpointStats, rootNode); + } + + private static TestCheckpointStats createTestCheckpointStats() { // Counts CheckpointStatsCounts counts = mock(CheckpointStatsCounts.class); when(counts.getNumberOfRestoredCheckpoints()).thenReturn(123123123L); @@ -104,7 +148,7 @@ public class CheckpointStatsHandlerTest { when(latestCompleted.getExternalPath()).thenReturn("latest-completed-external-path"); CompletedCheckpointStats latestSavepoint = mock(CompletedCheckpointStats.class); - when(latestSavepoint.getCheckpointId()).thenReturn(1992139L); + when(latestSavepoint.getCheckpointId()).thenReturn(1992140L); when(latestSavepoint.getTriggerTimestamp()).thenReturn(1919191900L); when(latestSavepoint.getLatestAckTimestamp()).thenReturn(1977791901L); when(latestSavepoint.getStateSize()).thenReturn(111939272822L); @@ -133,7 +177,7 @@ public class CheckpointStatsHandlerTest { List<AbstractCheckpointStats> checkpoints = new ArrayList<>(); PendingCheckpointStats inProgress = mock(PendingCheckpointStats.class); - when(inProgress.getCheckpointId()).thenReturn(1992139L); + when(inProgress.getCheckpointId()).thenReturn(1992141L); when(inProgress.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS); when(inProgress.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint()); when(inProgress.getTriggerTimestamp()).thenReturn(1919191900L); @@ -189,12 +233,15 @@ public class CheckpointStatsHandlerTest { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class)); - String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()); - - ObjectMapper mapper = new ObjectMapper(); - JsonNode rootNode = mapper.readTree(json); + return new TestCheckpointStats( + graph, counts, stateSizeSummary, durationSummary, alignmentBufferedSummary, summary, + latestCompleted, latestSavepoint, latestFailed, latestRestored, inProgress, + completedSavepoint, failed, history, snapshot + ); + } + private static void compareCheckpointStats(TestCheckpointStats checkpointStats, JsonNode rootNode) { + CheckpointStatsCounts counts = checkpointStats.counts; JsonNode countNode = rootNode.get("counts"); assertEquals(counts.getNumberOfRestoredCheckpoints(), countNode.get("restored").asLong()); assertEquals(counts.getTotalNumberOfCheckpoints(), countNode.get("total").asLong()); @@ -202,22 +249,26 @@ public class CheckpointStatsHandlerTest { assertEquals(counts.getNumberOfCompletedCheckpoints(), countNode.get("completed").asLong()); assertEquals(counts.getNumberOfFailedCheckpoints(), countNode.get("failed").asLong()); + MinMaxAvgStats stateSizeSummary = checkpointStats.stateSizeSummary; JsonNode summaryNode = rootNode.get("summary"); JsonNode sizeSummaryNode = summaryNode.get("state_size"); assertEquals(stateSizeSummary.getMinimum(), sizeSummaryNode.get("min").asLong()); assertEquals(stateSizeSummary.getMaximum(), sizeSummaryNode.get("max").asLong()); assertEquals(stateSizeSummary.getAverage(), sizeSummaryNode.get("avg").asLong()); + MinMaxAvgStats durationSummary = checkpointStats.durationSummary; JsonNode durationSummaryNode = summaryNode.get("end_to_end_duration"); assertEquals(durationSummary.getMinimum(), durationSummaryNode.get("min").asLong()); assertEquals(durationSummary.getMaximum(), durationSummaryNode.get("max").asLong()); assertEquals(durationSummary.getAverage(), durationSummaryNode.get("avg").asLong()); + MinMaxAvgStats alignmentBufferedSummary = checkpointStats.alignmentBufferedSummary; JsonNode alignmentBufferedNode = summaryNode.get("alignment_buffered"); assertEquals(alignmentBufferedSummary.getMinimum(), alignmentBufferedNode.get("min").asLong()); assertEquals(alignmentBufferedSummary.getMaximum(), alignmentBufferedNode.get("max").asLong()); assertEquals(alignmentBufferedSummary.getAverage(), alignmentBufferedNode.get("avg").asLong()); + CompletedCheckpointStats latestCompleted = checkpointStats.latestCompleted; JsonNode latestNode = rootNode.get("latest"); JsonNode latestCheckpointNode = latestNode.get("completed"); assertEquals(latestCompleted.getCheckpointId(), latestCheckpointNode.get("id").asLong()); @@ -228,6 +279,7 @@ public class CheckpointStatsHandlerTest { assertEquals(latestCompleted.getAlignmentBuffered(), latestCheckpointNode.get("alignment_buffered").asLong()); assertEquals(latestCompleted.getExternalPath(), latestCheckpointNode.get("external_path").asText()); + CompletedCheckpointStats latestSavepoint = checkpointStats.latestSavepoint; JsonNode latestSavepointNode = latestNode.get("savepoint"); assertEquals(latestSavepoint.getCheckpointId(), latestSavepointNode.get("id").asLong()); assertEquals(latestSavepoint.getTriggerTimestamp(), latestSavepointNode.get("trigger_timestamp").asLong()); @@ -237,6 +289,7 @@ public class CheckpointStatsHandlerTest { assertEquals(latestSavepoint.getAlignmentBuffered(), latestSavepointNode.get("alignment_buffered").asLong()); assertEquals(latestSavepoint.getExternalPath(), latestSavepointNode.get("external_path").asText()); + FailedCheckpointStats latestFailed = checkpointStats.latestFailed; JsonNode latestFailedNode = latestNode.get("failed"); assertEquals(latestFailed.getCheckpointId(), latestFailedNode.get("id").asLong()); assertEquals(latestFailed.getTriggerTimestamp(), latestFailedNode.get("trigger_timestamp").asLong()); @@ -247,6 +300,7 @@ public class CheckpointStatsHandlerTest { assertEquals(latestFailed.getFailureTimestamp(), latestFailedNode.get("failure_timestamp").asLong()); assertEquals(latestFailed.getFailureMessage(), latestFailedNode.get("failure_message").asText()); + RestoredCheckpointStats latestRestored = checkpointStats.latestRestored; JsonNode latestRestoredNode = latestNode.get("restored"); assertEquals(latestRestored.getCheckpointId(), latestRestoredNode.get("id").asLong()); assertEquals(latestRestored.getRestoreTimestamp(), latestRestoredNode.get("restore_timestamp").asLong()); @@ -259,6 +313,25 @@ public class CheckpointStatsHandlerTest { assertTrue(it.hasNext()); JsonNode inProgressNode = it.next(); + PendingCheckpointStats inProgress = checkpointStats.inProgress; + compareInProgressCheckpoint(inProgress, inProgressNode); + + assertTrue(it.hasNext()); + JsonNode completedSavepointNode = it.next(); + + CompletedCheckpointStats completedSavepoint = checkpointStats.completedSavepoint; + compareCompletedSavepoint(completedSavepoint, completedSavepointNode); + + assertTrue(it.hasNext()); + JsonNode failedNode = it.next(); + + FailedCheckpointStats failed = checkpointStats.failed; + compareFailedCheckpoint(failed, failedNode); + + assertFalse(it.hasNext()); + } + + private static void compareInProgressCheckpoint(PendingCheckpointStats inProgress, JsonNode inProgressNode) { assertEquals(inProgress.getCheckpointId(), inProgressNode.get("id").asLong()); assertEquals(inProgress.getStatus().toString(), inProgressNode.get("status").asText()); assertEquals(inProgress.getProperties().isSavepoint(), inProgressNode.get("is_savepoint").asBoolean()); @@ -269,10 +342,9 @@ public class CheckpointStatsHandlerTest { assertEquals(inProgress.getAlignmentBuffered(), inProgressNode.get("alignment_buffered").asLong()); assertEquals(inProgress.getNumberOfSubtasks(), inProgressNode.get("num_subtasks").asInt()); assertEquals(inProgress.getNumberOfAcknowledgedSubtasks(), inProgressNode.get("num_acknowledged_subtasks").asInt()); + } - assertTrue(it.hasNext()); - JsonNode completedSavepointNode = it.next(); - + private static void compareCompletedSavepoint(CompletedCheckpointStats completedSavepoint, JsonNode completedSavepointNode) { assertEquals(completedSavepoint.getCheckpointId(), completedSavepointNode.get("id").asLong()); assertEquals(completedSavepoint.getStatus().toString(), completedSavepointNode.get("status").asText()); assertEquals(completedSavepoint.getProperties().isSavepoint(), completedSavepointNode.get("is_savepoint").asBoolean()); @@ -286,10 +358,9 @@ public class CheckpointStatsHandlerTest { assertEquals(completedSavepoint.getExternalPath(), completedSavepointNode.get("external_path").asText()); assertEquals(completedSavepoint.isDiscarded(), completedSavepointNode.get("discarded").asBoolean()); + } - assertTrue(it.hasNext()); - JsonNode failedNode = it.next(); - + private static void compareFailedCheckpoint(FailedCheckpointStats failed, JsonNode failedNode) { assertEquals(failed.getCheckpointId(), failedNode.get("id").asLong()); assertEquals(failed.getStatus().toString(), failedNode.get("status").asText()); assertEquals(failed.getProperties().isSavepoint(), failedNode.get("is_savepoint").asBoolean()); @@ -303,7 +374,56 @@ public class CheckpointStatsHandlerTest { assertEquals(failed.getFailureTimestamp(), failedNode.get("failure_timestamp").asLong()); assertEquals(failed.getFailureMessage(), failedNode.get("failure_message").asText()); - - assertFalse(it.hasNext()); + } + + private static class TestCheckpointStats { + public final AccessExecutionGraph graph; + public final CheckpointStatsCounts counts; + public final MinMaxAvgStats stateSizeSummary; + public final MinMaxAvgStats durationSummary; + public final MinMaxAvgStats alignmentBufferedSummary; + public final CompletedCheckpointStatsSummary summary; + public final CompletedCheckpointStats latestCompleted; + public final CompletedCheckpointStats latestSavepoint; + public final FailedCheckpointStats latestFailed; + public final RestoredCheckpointStats latestRestored; + public final PendingCheckpointStats inProgress; + public final CompletedCheckpointStats completedSavepoint; + public final FailedCheckpointStats failed; + public final CheckpointStatsHistory history; + public final CheckpointStatsSnapshot snapshot; + + public TestCheckpointStats( + AccessExecutionGraph graph, + CheckpointStatsCounts counts, + MinMaxAvgStats stateSizeSummary, + MinMaxAvgStats durationSummary, + MinMaxAvgStats alignmentBufferedSummary, + CompletedCheckpointStatsSummary summary, + CompletedCheckpointStats latestCompleted, + CompletedCheckpointStats latestSavepoint, + FailedCheckpointStats latestFailed, + RestoredCheckpointStats latestRestored, + PendingCheckpointStats inProgress, + CompletedCheckpointStats completedSavepoint, + FailedCheckpointStats failed, + CheckpointStatsHistory history, + CheckpointStatsSnapshot snapshot) { + this.graph = graph; + this.counts = counts; + this.stateSizeSummary = stateSizeSummary; + this.durationSummary = durationSummary; + this.alignmentBufferedSummary = alignmentBufferedSummary; + this.summary = summary; + this.latestCompleted = latestCompleted; + this.latestSavepoint = latestSavepoint; + this.latestFailed = latestFailed; + this.latestRestored = latestRestored; + this.inProgress = inProgress; + this.completedSavepoint = completedSavepoint; + this.failed = failed; + this.history = history; + this.snapshot = snapshot; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java index 26433fa..bbab621 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; @@ -31,9 +32,13 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + import org.junit.Assert; import org.junit.Test; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -55,6 +60,42 @@ import static org.mockito.Mockito.when; public class CheckpointStatsSubtaskDetailsHandlerTest { @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist(); + ObjectMapper mapper = new ObjectMapper(); + + PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class); + when(checkpoint.getCheckpointId()).thenReturn(1992139L); + when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS); + when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack timestamp = duration + + TaskStateStats task = createTaskStateStats(1237); + when(checkpoint.getAllTaskStateStats()).thenReturn(Collections.singletonList(task)); + + CheckpointStatsHistory history = mock(CheckpointStatsHistory.class); + when(history.getCheckpoints()).thenReturn(Collections.<AbstractCheckpointStats>singletonList(checkpoint)); + CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class); + when(snapshot.getHistory()).thenReturn(history); + + AccessExecutionGraph graph = mock(AccessExecutionGraph.class); + when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); + when(graph.getJobID()).thenReturn(new JobID()); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals( + "/jobs/" + graph.getJobID() + "/checkpoints/details/" + checkpoint.getCheckpointId() + "/subtasks/" + task.getJobVertexId(), + archive.getPath()); + JsonNode rootNode = mapper.readTree(archive.getJson()); + assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong()); + assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText()); + + verifyTaskNode(rootNode, task, checkpoint.getTriggerTimestamp()); + } + + @Test public void testGetPaths() { CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0)); String[] paths = handler.getPaths(); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java index 0340d87..ed339ed 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java @@ -105,7 +105,7 @@ public class ArchivedJobGenerationUtils { originalAttempt = new ArchivedExecutionBuilder() .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9}) .setParallelSubtaskIndex(1) - .setAttemptNumber(3) + .setAttemptNumber(0) .setAssignedResourceLocation(location) .setUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2}) .setState(ExecutionState.FINISHED) http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java new file mode 100644 index 0000000..22e011c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.util.Preconditions; + +/** + * A simple container for a handler's JSON response and the REST URLs for which the response would've been returned. + * + * These are created by {@link JsonArchivist}s, and used by the {@link MemoryArchivist} to create a directory structure + * resembling the REST API. + */ +public class ArchivedJson { + private final String path; + private final String json; + + public ArchivedJson(String path, String json) { + this.path = Preconditions.checkNotNull(path); + this.json = Preconditions.checkNotNull(json); + } + + public String getPath() { + return path; + } + + public String getJson() { + return json; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java new file mode 100644 index 0000000..a87cc47 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for all classes that want to participate in the archiving of job-related json responses. + * + * Note that all JsonArchivists that are to be used for the history server must be added + * to {@link WebRuntimeMonitor#getArchivers()}. + */ +public interface JsonArchivist { + + /** + * Returns a {@link Collection} of {@link ArchivedJson}s containing JSON responses and their respective REST URL + * for a given job. + * + * The collection should contain one entry for every response that could be generated for the given + * job, for example one entry for each task. The REST URLs should be unique and must not contain placeholders. + * + * @param graph AccessExecutionGraph for which the responses should be generated + * + * @return Collection containing an ArchivedJson for every response that could be generated for the given job + * @throws IOException thrown if the JSON generation fails + */ + Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException; +}
