Repository: flink Updated Branches: refs/heads/master 126fb1779 -> dcfa3fbb0
[FLINK-5628] [webfrontend] Fix serializability of checkpoint stats tracker This closes #3215. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dcfa3fbb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dcfa3fbb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dcfa3fbb Branch: refs/heads/master Commit: dcfa3fbb0f17400ebf823e10f803cde8563fff4a Parents: 126fb17 Author: Ufuk Celebi <u...@apache.org> Authored: Wed Jan 25 15:42:24 2017 +0100 Committer: Ufuk Celebi <u...@apache.org> Committed: Mon Jan 30 16:33:22 2017 +0100 ---------------------------------------------------------------------- .../checkpoints/CheckpointConfigHandler.java | 8 +- .../CheckpointStatsDetailsHandler.java | 7 +- .../CheckpointStatsDetailsSubtasksHandler.java | 7 +- .../checkpoints/CheckpointStatsHandler.java | 7 +- .../CheckpointConfigHandlerTest.java | 13 +-- .../CheckpointStatsDetailsHandlerTest.java | 13 +-- .../checkpoints/CheckpointStatsHandlerTest.java | 5 +- ...heckpointStatsSubtaskDetailsHandlerTest.java | 13 +-- .../checkpoint/AbstractCheckpointStats.java | 5 +- .../checkpoint/CheckpointStatsHistory.java | 4 +- .../checkpoint/CheckpointStatsTracker.java | 16 ++-- .../checkpoint/CompletedCheckpointStats.java | 26 +++--- .../checkpoint/FailedCheckpointStats.java | 24 +++--- .../checkpoint/PendingCheckpointStats.java | 4 +- .../checkpoint/RestoredCheckpointStats.java | 2 +- .../runtime/checkpoint/SubtaskStateStats.java | 8 +- .../runtime/checkpoint/TaskStateStats.java | 9 ++- .../executiongraph/AccessExecutionGraph.java | 20 +++-- .../executiongraph/ArchivedExecutionGraph.java | 54 ++++++++----- .../runtime/executiongraph/ExecutionGraph.java | 22 ++++- .../tasks/ExternalizedCheckpointSettings.java | 2 + .../checkpoint/CheckpointStatsHistoryTest.java | 1 - .../checkpoint/CheckpointStatsSnapshotTest.java | 84 ++++++++++++++++++++ .../checkpoint/CompletedCheckpointTest.java | 35 ++++++++ .../checkpoint/FailedCheckpointStatsTest.java | 40 ++++++++++ .../checkpoint/PendingCheckpointStatsTest.java | 38 +++++++-- .../checkpoint/SubtaskStateStatsTest.java | 36 +++++++++ .../runtime/checkpoint/TaskStateStatsTest.java | 46 ++++++++++- .../ArchivedExecutionGraphTest.java | 7 +- .../tasks/JobSnapshottingSettingsTest.java | 59 ++++++++++++++ 30 files changed, 490 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java index 1ad5e65..be0d283 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; import com.fasterxml.jackson.core.JsonGenerator; -import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; @@ -42,10 +41,13 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + JobSnapshottingSettings settings = graph.getJobSnapshottingSettings(); - CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker(); - JobSnapshottingSettings settings = tracker.getSnapshottingSettings(); + if (settings == null) { + return "{}"; + } gen.writeStartObject(); { http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java index 6bb8300..33d6cf7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; import org.apache.flink.runtime.checkpoint.CheckpointProperties; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; -import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; import org.apache.flink.runtime.checkpoint.TaskStateStats; @@ -54,8 +53,10 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest return "{}"; } - CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker(); - CheckpointStatsSnapshot snapshot = tracker.createSnapshot(); + CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot(); + if (snapshot == null) { + return "{}"; + } AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId); http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java index 00195b5..d55467f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; -import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; import org.apache.flink.runtime.checkpoint.SubtaskStateStats; import org.apache.flink.runtime.checkpoint.TaskStateStats; @@ -72,8 +71,10 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap return "{}"; } - CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker(); - CheckpointStatsSnapshot snapshot = tracker.createSnapshot(); + CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot(); + if (snapshot == null) { + return "{}"; + } AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId); http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java index 71f3637..8aab5fa 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointProperties; import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts; import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; -import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary; import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; @@ -54,8 +53,10 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); - CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker(); - CheckpointStatsSnapshot snapshot = tracker.createSnapshot(); + CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot(); + if (snapshot == null) { + return "{}"; + } gen.writeStartObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java index 410e044..e517c3c 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; @@ -60,9 +59,7 @@ public class CheckpointConfigHandlerTest { true); AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class); - when(graph.getCheckpointStatsTracker()).thenReturn(tracker); - when(tracker.getSnapshottingSettings()).thenReturn(settings); + when(graph.getJobSnapshottingSettings()).thenReturn(settings); CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class)); String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()); @@ -98,9 +95,7 @@ public class CheckpointConfigHandlerTest { false); // at least once AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class); - when(graph.getCheckpointStatsTracker()).thenReturn(tracker); - when(tracker.getSnapshottingSettings()).thenReturn(settings); + when(graph.getJobSnapshottingSettings()).thenReturn(settings); CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class)); String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()); @@ -130,9 +125,7 @@ public class CheckpointConfigHandlerTest { false); // at least once AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class); - when(graph.getCheckpointStatsTracker()).thenReturn(tracker); - when(tracker.getSnapshottingSettings()).thenReturn(settings); + when(graph.getJobSnapshottingSettings()).thenReturn(settings); CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class)); String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()); http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java index 17c8558..fb5cfc5 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointProperties; import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; -import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; import org.apache.flink.runtime.checkpoint.PendingCheckpointStats; @@ -89,9 +88,7 @@ public class CheckpointStatsDetailsHandlerTest { when(snapshot.getHistory()).thenReturn(history); AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class); - when(graph.getCheckpointStatsTracker()).thenReturn(tracker); - when(tracker.createSnapshot()).thenReturn(snapshot); + when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0)); Map<String, String> params = new HashMap<>(); @@ -238,16 +235,14 @@ public class CheckpointStatsDetailsHandlerTest { // ------------------------------------------------------------------------ - static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception { + private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception { CheckpointStatsHistory history = mock(CheckpointStatsHistory.class); when(history.getCheckpointById(anyLong())).thenReturn(checkpoint); CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class); when(snapshot.getHistory()).thenReturn(history); AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class); - when(graph.getCheckpointStatsTracker()).thenReturn(tracker); - when(tracker.createSnapshot()).thenReturn(snapshot); + when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0)); Map<String, String> params = new HashMap<>(); @@ -258,7 +253,7 @@ public class CheckpointStatsDetailsHandlerTest { return mapper.readTree(json); } - static void verifyTaskNode(TaskStateStats task, JsonNode parentNode) { + private static void verifyTaskNode(TaskStateStats task, JsonNode parentNode) { long duration = ThreadLocalRandom.current().nextInt(128); JsonNode taskNode = parentNode.get("tasks").get(task.getJobVertexId().toString()); http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java index 8274b36..23a1900 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts; import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; -import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary; import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; @@ -179,9 +178,7 @@ public class CheckpointStatsHandlerTest { when(snapshot.getLatestRestoredCheckpoint()).thenReturn(latestRestored); AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class); - when(graph.getCheckpointStatsTracker()).thenReturn(tracker); - when(tracker.createSnapshot()).thenReturn(snapshot); + when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class)); String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()); http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java index 8b7201d..571adad 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; -import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; import org.apache.flink.runtime.checkpoint.PendingCheckpointStats; import org.apache.flink.runtime.checkpoint.SubtaskStateStats; @@ -129,9 +128,7 @@ public class CheckpointStatsSubtaskDetailsHandlerTest { when(snapshot.getHistory()).thenReturn(history); AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class); - when(graph.getCheckpointStatsTracker()).thenReturn(tracker); - when(tracker.createSnapshot()).thenReturn(snapshot); + when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0)); Map<String, String> params = new HashMap<>(); @@ -186,9 +183,7 @@ public class CheckpointStatsSubtaskDetailsHandlerTest { when(snapshot.getHistory()).thenReturn(history); AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class); - when(graph.getCheckpointStatsTracker()).thenReturn(tracker); - when(tracker.createSnapshot()).thenReturn(snapshot); + when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0)); Map<String, String> params = new HashMap<>(); @@ -209,9 +204,7 @@ public class CheckpointStatsSubtaskDetailsHandlerTest { when(snapshot.getHistory()).thenReturn(history); AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class); - when(graph.getCheckpointStatsTracker()).thenReturn(tracker); - when(tracker.createSnapshot()).thenReturn(snapshot); + when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0)); Map<String, String> params = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java index 6c261a5..5b3c7c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.JobVertexID; import javax.annotation.Nullable; +import java.io.Serializable; import java.util.Collection; import java.util.Map; @@ -30,7 +31,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * Base class for checkpoint statistics. */ -public abstract class AbstractCheckpointStats { +public abstract class AbstractCheckpointStats implements Serializable { + + private static final long serialVersionUID = 1041218202028265151L; /** ID of this checkpoint. */ final long checkpointId; http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java index 56fc9c1..13ce642 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java @@ -307,7 +307,9 @@ public class CheckpointStatsHistory implements Serializable { * * <p>The iteration order is in reverse insertion order. */ - private static class CheckpointsStatsHistoryIterable implements Iterable<AbstractCheckpointStats> { + private static class CheckpointsStatsHistoryIterable implements Iterable<AbstractCheckpointStats>, Serializable { + + private static final long serialVersionUID = 726376482426055490L; /** Copy of the checkpointsArray array at the point when this iterable was created. */ private final AbstractCheckpointStats[] checkpointsArray; http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java index 92f707f..d324c25 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java @@ -27,7 +27,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import javax.annotation.Nullable; -import java.io.Serializable; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; @@ -52,9 +51,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * <p>The statistics are accessed via {@link #createSnapshot()} and exposed via * both the web frontend and the {@link Metric} system. */ -public class CheckpointStatsTracker implements Serializable { - - private static final long serialVersionUID = 1694085244807339288L; +public class CheckpointStatsTracker { /** * Lock used to update stats and creating snapshots. Updates always happen @@ -67,9 +64,6 @@ public class CheckpointStatsTracker implements Serializable { */ private final ReentrantLock statsReadWriteLock = new ReentrantLock(); - /** The job vertices taking part in the checkpoints. */ - private final List<ExecutionJobVertex> jobVertices; - /** Total number of subtasks to checkpoint. */ private final int totalSubtaskCount; @@ -85,6 +79,9 @@ public class CheckpointStatsTracker implements Serializable { /** History of checkpoints. */ private final CheckpointStatsHistory history; + /** The job vertices taking part in the checkpoints. */ + private final transient List<ExecutionJobVertex> jobVertices; + /** The latest restored checkpoint. */ @Nullable private RestoredCheckpointStats latestRestoredCheckpoint; @@ -217,6 +214,11 @@ public class CheckpointStatsTracker implements Serializable { return pending; } + /** + * Callback when a checkpoint is restored. + * + * @param restored The restored checkpoint stats. + */ void reportRestoredCheckpoint(RestoredCheckpointStats restored) { checkNotNull(restored, "Restored checkpoint"); http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java index 4d2d995..f6b6aed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java @@ -35,8 +35,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class CompletedCheckpointStats extends AbstractCheckpointStats { - /** Callback for the {@link CompletedCheckpoint} instance to notify about discard. */ - private final DiscardCallback discardCallback; + private static final long serialVersionUID = 138833868551861343L; /** Total checkpoint state size over all subtasks. */ private final long stateSize; @@ -69,16 +68,16 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats { * @param externalPath Optional external path if persisted externally. */ CompletedCheckpointStats( - long checkpointId, - long triggerTimestamp, - CheckpointProperties props, - int totalSubtaskCount, - Map<JobVertexID, TaskStateStats> taskStats, - int numAcknowledgedSubtasks, - long stateSize, - long alignmentBuffered, - SubtaskStateStats latestAcknowledgedSubtask, - @Nullable String externalPath) { + long checkpointId, + long triggerTimestamp, + CheckpointProperties props, + int totalSubtaskCount, + Map<JobVertexID, TaskStateStats> taskStats, + int numAcknowledgedSubtasks, + long stateSize, + long alignmentBuffered, + SubtaskStateStats latestAcknowledgedSubtask, + @Nullable String externalPath) { super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats); checkArgument(numAcknowledgedSubtasks == totalSubtaskCount, "Did not acknowledge all subtasks."); @@ -87,7 +86,6 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats { this.alignmentBuffered = alignmentBuffered; this.latestAcknowledgedSubtask = checkNotNull(latestAcknowledgedSubtask); this.externalPath = externalPath; - this.discardCallback = new DiscardCallback(); } @Override @@ -145,7 +143,7 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats { * @return Callback for the {@link CompletedCheckpoint}. */ DiscardCallback getDiscardCallback() { - return discardCallback; + return new DiscardCallback(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java index 83d7c3d..2f596a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java @@ -32,6 +32,8 @@ import static org.apache.flink.util.Preconditions.checkArgument; */ public class FailedCheckpointStats extends AbstractCheckpointStats { + private static final long serialVersionUID = 8000748529515900106L; + /** Number of acknowledged tasks. */ private final int numAcknowledgedSubtasks; @@ -71,17 +73,17 @@ public class FailedCheckpointStats extends AbstractCheckpointStats { * @param cause Cause of the checkpoint failure or <code>null</code>. */ FailedCheckpointStats( - long checkpointId, - long triggerTimestamp, - CheckpointProperties props, - int totalSubtaskCount, - Map<JobVertexID, TaskStateStats> taskStats, - int numAcknowledgedSubtasks, - long stateSize, - long alignmentBuffered, - long failureTimestamp, - @Nullable SubtaskStateStats latestAcknowledgedSubtask, - @Nullable Throwable cause) { + long checkpointId, + long triggerTimestamp, + CheckpointProperties props, + int totalSubtaskCount, + Map<JobVertexID, TaskStateStats> taskStats, + int numAcknowledgedSubtasks, + long stateSize, + long alignmentBuffered, + long failureTimestamp, + @Nullable SubtaskStateStats latestAcknowledgedSubtask, + @Nullable Throwable cause) { super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats); checkArgument(numAcknowledgedSubtasks >= 0, "Negative number of ACKs"); http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java index e6fa80f..0f32250 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java @@ -42,8 +42,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class PendingCheckpointStats extends AbstractCheckpointStats { + private static final long serialVersionUID = -973959257699390327L; + /** Tracker callback when the pending checkpoint is finalized or aborted. */ - private final CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback; + private transient final CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback; /** The current number of acknowledged subtasks. */ private volatile int currentNumAcknowledgedSubtasks; http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java index c21937a..8b8a5e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java @@ -56,7 +56,7 @@ public class RestoredCheckpointStats implements Serializable { long checkpointId, CheckpointProperties props, long restoreTimestamp, - String externalPath) { + @Nullable String externalPath) { this.checkpointId = checkpointId; this.props = checkNotNull(props, "Checkpoint Properties"); http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java index 3a66032..ee9e287 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.checkpoint; +import java.io.Serializable; + import static org.apache.flink.util.Preconditions.checkArgument; /** @@ -30,8 +32,10 @@ import static org.apache.flink.util.Preconditions.checkArgument; * * <p>This is the smallest immutable unit of the stats. */ -public class SubtaskStateStats { - +public class SubtaskStateStats implements Serializable { + + private static final long serialVersionUID = 8928594531621862214L; + /** Index of this sub task. */ private final int subtaskIndex; http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java index fc118d9..2f779a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.JobVertexID; import javax.annotation.Nullable; +import java.io.Serializable; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -29,7 +30,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * Statistics for a single task/operator that gathers all statistics of its * subtasks and provides summary statistics about all subtasks. */ -public class TaskStateStats { +public class TaskStateStats implements Serializable { + + private static final long serialVersionUID = 531803101206574444L; /** ID of the task the stats belong to. */ private final JobVertexID jobVertexId; @@ -195,7 +198,9 @@ public class TaskStateStats { /** * Summary of the subtask stats of a single task/operator. */ - public static class TaskStateStatsSummary { + public static class TaskStateStatsSummary implements Serializable { + + private static final long serialVersionUID = 1009476026522091909L; private MinMaxAvgStats stateSize = new MinMaxAvgStats(); private MinMaxAvgStats ackTimestamp = new MinMaxAvgStats(); http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java index 3490dc8..18c2ec2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java @@ -17,13 +17,14 @@ */ package org.apache.flink.runtime.executiongraph; +import org.apache.flink.api.common.ArchivedExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; -import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; -import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.util.SerializedValue; import java.io.IOException; @@ -115,11 +116,20 @@ public interface AccessExecutionGraph { CheckpointCoordinator getCheckpointCoordinator(); /** - * Returns the {@link CheckpointStatsTracker} for this execution graph. + * Returns the {@link JobSnapshottingSettings} or <code>null</code> if + * checkpointing is disabled. + * + * @return JobSnapshottingSettings for this execution graph + */ + JobSnapshottingSettings getJobSnapshottingSettings(); + + /** + * Returns a snapshot of the checkpoint statistics or <code>null</code> if + * checkpointing is disabled. * - * @return CheckpointStatsTracker for thie execution graph + * @return Snapshot of the checkpoint statistics for this execution graph */ - CheckpointStatsTracker getCheckpointStatsTracker(); + CheckpointStatsSnapshot getCheckpointStatsSnapshot(); /** * Returns the {@link ArchivedExecutionConfig} for this execution graph. http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java index 440ecda..334b0d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java @@ -21,11 +21,13 @@ import org.apache.flink.api.common.ArchivedExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; -import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.util.SerializedValue; +import javax.annotation.Nullable; import java.io.Serializable; import java.util.Collections; import java.util.Iterator; @@ -34,6 +36,7 @@ import java.util.Map; import java.util.NoSuchElementException; public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializable { + private static final long serialVersionUID = 7231383912742578428L; // -------------------------------------------------------------------------------------------- @@ -76,23 +79,29 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl private final ArchivedExecutionConfig archivedExecutionConfig; private final boolean isStoppable; private final Map<String, SerializedValue<Object>> serializedUserAccumulators; - private final CheckpointStatsTracker tracker; + + @Nullable + private final JobSnapshottingSettings jobSnapshottingSettings; + + @Nullable + private final CheckpointStatsSnapshot checkpointStatsSnapshot; public ArchivedExecutionGraph( - JobID jobID, - String jobName, - Map<JobVertexID, ArchivedExecutionJobVertex> tasks, - List<ArchivedExecutionJobVertex> verticesInCreationOrder, - long[] stateTimestamps, - JobStatus state, - String failureCause, - String jsonPlan, - StringifiedAccumulatorResult[] archivedUserAccumulators, - Map<String, SerializedValue<Object>> serializedUserAccumulators, - ArchivedExecutionConfig executionConfig, - boolean isStoppable, - CheckpointStatsTracker tracker - ) { + JobID jobID, + String jobName, + Map<JobVertexID, ArchivedExecutionJobVertex> tasks, + List<ArchivedExecutionJobVertex> verticesInCreationOrder, + long[] stateTimestamps, + JobStatus state, + String failureCause, + String jsonPlan, + StringifiedAccumulatorResult[] archivedUserAccumulators, + Map<String, SerializedValue<Object>> serializedUserAccumulators, + ArchivedExecutionConfig executionConfig, + boolean isStoppable, + @Nullable JobSnapshottingSettings jobSnapshottingSettings, + @Nullable CheckpointStatsSnapshot checkpointStatsSnapshot) { + this.jobID = jobID; this.jobName = jobName; this.tasks = tasks; @@ -105,10 +114,12 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl this.serializedUserAccumulators = serializedUserAccumulators; this.archivedExecutionConfig = executionConfig; this.isStoppable = isStoppable; - this.tracker = tracker; + this.jobSnapshottingSettings = jobSnapshottingSettings; + this.checkpointStatsSnapshot = checkpointStatsSnapshot; } // -------------------------------------------------------------------------------------------- + @Override public String getJsonPlan() { return jsonPlan; @@ -200,8 +211,13 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl } @Override - public CheckpointStatsTracker getCheckpointStatsTracker() { - return tracker; + public JobSnapshottingSettings getJobSnapshottingSettings() { + return jobSnapshottingSettings; + } + + @Override + public CheckpointStatsSnapshot getCheckpointStatsSnapshot() { + return checkpointStatsSnapshot; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 2069638..da4a66e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.execution.ExecutionState; @@ -50,6 +51,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.query.KvStateLocationRegistry; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -425,8 +427,21 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive } @Override - public CheckpointStatsTracker getCheckpointStatsTracker() { - return checkpointStatsTracker; + public JobSnapshottingSettings getJobSnapshottingSettings() { + if (checkpointStatsTracker != null) { + return checkpointStatsTracker.getSnapshottingSettings(); + } else { + return null; + } + } + + @Override + public CheckpointStatsSnapshot getCheckpointStatsSnapshot() { + if (checkpointStatsTracker != null) { + return checkpointStatsTracker.createSnapshot(); + } else { + return null; + } } private ExecutionVertex[] collectExecutionVertices(List<ExecutionJobVertex> jobVertices) { @@ -1338,6 +1353,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive serializedUserAccumulators, getArchivedExecutionConfig(), isStoppable(), - getCheckpointStatsTracker()); + getJobSnapshottingSettings(), + getCheckpointStatsSnapshot()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java index 779fc76..b3b487c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java @@ -26,6 +26,8 @@ import org.apache.flink.annotation.Internal; @Internal public class ExternalizedCheckpointSettings implements java.io.Serializable { + private static final long serialVersionUID = -6271691851124392955L; + private static final ExternalizedCheckpointSettings NONE = new ExternalizedCheckpointSettings(false, false); /** Flag indicating whether checkpoints should be externalized. */ http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java index 098fe17..7541806 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java @@ -192,5 +192,4 @@ public class CheckpointStatsHistoryTest { return failed; } - } http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java new file mode 100644 index 0000000..6500369 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.core.testutils.CommonTestUtils; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CheckpointStatsSnapshotTest { + + /** + * Tests that the snapshot is actually serializable. + */ + @Test + public void testIsJavaSerializable() throws Exception { + CheckpointStatsCounts counts = new CheckpointStatsCounts(); + counts.incrementInProgressCheckpoints(); + counts.incrementInProgressCheckpoints(); + counts.incrementInProgressCheckpoints(); + counts.incrementCompletedCheckpoints(); + counts.incrementFailedCheckpoints(); + counts.incrementRestoredCheckpoints(); + + CompletedCheckpointStatsSummary summary = new CompletedCheckpointStatsSummary(); + summary.updateSummary(createCompletedCheckpointsStats(12398, 9919, 12)); + summary.updateSummary(createCompletedCheckpointsStats(2221, 3333, 9122)); + + CheckpointStatsHistory history = new CheckpointStatsHistory(1); + RestoredCheckpointStats restored = new RestoredCheckpointStats(1, CheckpointProperties.forStandardCheckpoint(), 99119, null); + + CheckpointStatsSnapshot snapshot = new CheckpointStatsSnapshot( + counts, + summary, + history, + restored); + + CheckpointStatsSnapshot copy = CommonTestUtils.createCopySerializable(snapshot); + + assertEquals(counts.getNumberOfCompletedCheckpoints(), copy.getCounts().getNumberOfCompletedCheckpoints()); + assertEquals(counts.getNumberOfFailedCheckpoints(), copy.getCounts().getNumberOfFailedCheckpoints()); + assertEquals(counts.getNumberOfInProgressCheckpoints(), copy.getCounts().getNumberOfInProgressCheckpoints()); + assertEquals(counts.getNumberOfRestoredCheckpoints(), copy.getCounts().getNumberOfRestoredCheckpoints()); + assertEquals(counts.getTotalNumberOfCheckpoints(), copy.getCounts().getTotalNumberOfCheckpoints()); + + assertEquals(summary.getStateSizeStats().getSum(), copy.getSummaryStats().getStateSizeStats().getSum()); + assertEquals(summary.getEndToEndDurationStats().getSum(), copy.getSummaryStats().getEndToEndDurationStats().getSum()); + assertEquals(summary.getAlignmentBufferedStats().getSum(), copy.getSummaryStats().getAlignmentBufferedStats().getSum()); + + assertEquals(restored.getCheckpointId(), copy.getLatestRestoredCheckpoint().getCheckpointId()); + } + + private CompletedCheckpointStats createCompletedCheckpointsStats( + long stateSize, + long endToEndDuration, + long alignmentBuffered) { + + CompletedCheckpointStats completed = mock(CompletedCheckpointStats.class); + when(completed.getStateSize()).thenReturn(stateSize); + when(completed.getEndToEndDuration()).thenReturn(endToEndDuration); + when(completed.getAlignmentBuffered()).thenReturn(alignmentBuffered); + + return completed; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java index e466dc7..0d933ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.junit.Rule; @@ -143,4 +144,38 @@ public class CompletedCheckpointTest { completed.discard(JobStatus.FINISHED); verify(callback, times(1)).notifyDiscardedCheckpoint(); } + + @Test + public void testIsJavaSerializable() throws Exception { + TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3); + TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4); + + HashMap<JobVertexID, TaskStateStats> taskStats = new HashMap<>(); + taskStats.put(task1.getJobVertexId(), task1); + taskStats.put(task2.getJobVertexId(), task2); + + CompletedCheckpointStats completed = new CompletedCheckpointStats( + 123123123L, + 10123L, + CheckpointProperties.forStandardCheckpoint(), + 1337, + taskStats, + 1337, + 123129837912L, + 123819239812L, + new SubtaskStateStats(123, 213123, 123123, 0, 0, 0, 0), + null); + + CompletedCheckpointStats copy = CommonTestUtils.createCopySerializable(completed); + + assertEquals(completed.getCheckpointId(), copy.getCheckpointId()); + assertEquals(completed.getTriggerTimestamp(), copy.getTriggerTimestamp()); + assertEquals(completed.getProperties(), copy.getProperties()); + assertEquals(completed.getNumberOfSubtasks(), copy.getNumberOfSubtasks()); + assertEquals(completed.getNumberOfAcknowledgedSubtasks(), copy.getNumberOfAcknowledgedSubtasks()); + assertEquals(completed.getEndToEndDuration(), copy.getEndToEndDuration()); + assertEquals(completed.getStateSize(), copy.getStateSize()); + assertEquals(completed.getLatestAcknowledgedSubtaskStats().getSubtaskIndex(), copy.getLatestAcknowledgedSubtaskStats().getSubtaskIndex()); + assertEquals(completed.getStatus(), copy.getStatus()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java index 683c1c9..f1a56be 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java @@ -18,9 +18,11 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.junit.Test; +import java.io.NotSerializableException; import java.util.HashMap; import java.util.Map; @@ -57,4 +59,42 @@ public class FailedCheckpointStatsTest { assertEquals(duration, failed.getEndToEndDuration()); } + + @Test + public void testIsJavaSerializable() throws Exception { + long duration = 123912931293L; + long triggerTimestamp = 10123; + long failureTimestamp = triggerTimestamp + duration; + + Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>(); + JobVertexID jobVertexId = new JobVertexID(); + taskStats.put(jobVertexId, new TaskStateStats(jobVertexId, 1)); + + FailedCheckpointStats failed = new FailedCheckpointStats( + 123123123L, + triggerTimestamp, + CheckpointProperties.forStandardCheckpoint(), + 1337, + taskStats, + 3, + 190890123, + 0, + failureTimestamp, + null, + new NotSerializableException("message")); + + FailedCheckpointStats copy = CommonTestUtils.createCopySerializable(failed); + + assertEquals(failed.getCheckpointId(), copy.getCheckpointId()); + assertEquals(failed.getTriggerTimestamp(), copy.getTriggerTimestamp()); + assertEquals(failed.getProperties(), copy.getProperties()); + assertEquals(failed.getNumberOfSubtasks(), copy.getNumberOfSubtasks()); + assertEquals(failed.getNumberOfAcknowledgedSubtasks(), copy.getNumberOfAcknowledgedSubtasks()); + assertEquals(failed.getEndToEndDuration(), copy.getEndToEndDuration()); + assertEquals(failed.getStateSize(), copy.getStateSize()); + assertEquals(failed.getLatestAcknowledgedSubtaskStats(), copy.getLatestAcknowledgedSubtaskStats()); + assertEquals(failed.getStatus(), copy.getStatus()); + assertEquals(failed.getFailureMessage(), copy.getFailureMessage()); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java index 854e106..6c5e8fd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java @@ -18,12 +18,12 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.junit.Test; import org.mockito.ArgumentCaptor; import java.util.HashMap; -import java.util.Map; import static junit.framework.TestCase.assertFalse; import static org.junit.Assert.assertEquals; @@ -47,7 +47,7 @@ public class PendingCheckpointStatsTest { TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4); int totalSubtaskCount = task1.getNumberOfSubtasks() + task2.getNumberOfSubtasks(); - Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>(); + HashMap<JobVertexID, TaskStateStats> taskStats = new HashMap<>(); taskStats.put(task1.getJobVertexId(), task1); taskStats.put(task2.getJobVertexId(), task2); @@ -128,7 +128,7 @@ public class PendingCheckpointStatsTest { TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3); TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4); - Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>(); + HashMap<JobVertexID, TaskStateStats> taskStats = new HashMap<>(); taskStats.put(task1.getJobVertexId(), task1); taskStats.put(task2.getJobVertexId(), task2); @@ -165,7 +165,6 @@ public class PendingCheckpointStatsTest { assertNotNull(completed); assertEquals(CheckpointStatsStatus.COMPLETED, completed.getStatus()); assertFalse(completed.isDiscarded()); - assertEquals(discardCallback, completed.getDiscardCallback()); discardCallback.notifyDiscardedCheckpoint(); assertTrue(completed.isDiscarded()); assertEquals(externalPath, completed.getExternalPath()); @@ -189,7 +188,7 @@ public class PendingCheckpointStatsTest { TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3); TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4); - Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>(); + HashMap<JobVertexID, TaskStateStats> taskStats = new HashMap<>(); taskStats.put(task1.getJobVertexId(), task1); taskStats.put(task2.getJobVertexId(), task2); @@ -240,6 +239,35 @@ public class PendingCheckpointStatsTest { assertEquals(task2, failed.getTaskStateStats(task2.getJobVertexId())); } + @Test + public void testIsJavaSerializable() throws Exception { + TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3); + TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4); + + HashMap<JobVertexID, TaskStateStats> taskStats = new HashMap<>(); + taskStats.put(task1.getJobVertexId(), task1); + taskStats.put(task2.getJobVertexId(), task2); + + PendingCheckpointStats pending = new PendingCheckpointStats( + 123123123L, + 10123L, + CheckpointProperties.forStandardCheckpoint(), + 1337, + taskStats, + mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class)); + + PendingCheckpointStats copy = CommonTestUtils.createCopySerializable(pending); + + assertEquals(pending.getCheckpointId(), copy.getCheckpointId()); + assertEquals(pending.getTriggerTimestamp(), copy.getTriggerTimestamp()); + assertEquals(pending.getProperties(), copy.getProperties()); + assertEquals(pending.getNumberOfSubtasks(), copy.getNumberOfSubtasks()); + assertEquals(pending.getNumberOfAcknowledgedSubtasks(), copy.getNumberOfAcknowledgedSubtasks()); + assertEquals(pending.getEndToEndDuration(), copy.getEndToEndDuration()); + assertEquals(pending.getStateSize(), copy.getStateSize()); + assertEquals(pending.getLatestAcknowledgedSubtaskStats(), copy.getLatestAcknowledgedSubtaskStats()); + assertEquals(pending.getStatus(), copy.getStatus()); + } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java index 75c40c5..8514f33 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.core.testutils.CommonTestUtils; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -54,4 +55,39 @@ public class SubtaskStateStatsTest { // Trigger timestamp < ack timestamp assertEquals(0, stats.getEndToEndDuration(ackTimestamp + 1)); } + + /** + * Tests that the snapshot is actually serializable. + */ + @Test + public void testIsJavaSerializable() throws Exception { + SubtaskStateStats stats = new SubtaskStateStats( + 0, + Integer.MAX_VALUE + 1L, + Integer.MAX_VALUE + 2L, + Integer.MAX_VALUE + 3L, + Integer.MAX_VALUE + 4L, + Integer.MAX_VALUE + 5L, + Integer.MAX_VALUE + 6L); + + SubtaskStateStats copy = CommonTestUtils.createCopySerializable(stats); + + assertEquals(0, copy.getSubtaskIndex()); + assertEquals(Integer.MAX_VALUE + 1L, copy.getAckTimestamp()); + assertEquals(Integer.MAX_VALUE + 2L, copy.getStateSize()); + assertEquals(Integer.MAX_VALUE + 3L, copy.getSyncCheckpointDuration()); + assertEquals(Integer.MAX_VALUE + 4L, copy.getAsyncCheckpointDuration()); + assertEquals(Integer.MAX_VALUE + 5L, copy.getAlignmentBuffered()); + assertEquals(Integer.MAX_VALUE + 6L, copy.getAlignmentDuration()); + + // Check duration helper + long ackTimestamp = copy.getAckTimestamp(); + long triggerTimestamp = ackTimestamp - 10123; + assertEquals(10123, copy.getEndToEndDuration(triggerTimestamp)); + + // Trigger timestamp < ack timestamp + assertEquals(0, copy.getEndToEndDuration(ackTimestamp + 1)); + + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java index 94edd9e..dd46fe2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.junit.Test; @@ -31,6 +32,8 @@ import static org.junit.Assert.assertTrue; public class TaskStateStatsTest { + private final ThreadLocalRandom rand = ThreadLocalRandom.current(); + /** * Tests that subtask stats are correctly collected. */ @@ -48,8 +51,6 @@ public class TaskStateStatsTest { assertEquals(-1, taskStats.getLatestAckTimestamp()); assertArrayEquals(subtasks, taskStats.getSubtaskStats()); - ThreadLocalRandom rand = ThreadLocalRandom.current(); - long stateSize = 0; long alignmentBuffered = 0; @@ -90,4 +91,45 @@ public class TaskStateStatsTest { assertEquals(subtasks.length, summary.getAlignmentBufferedStats().getCount()); assertEquals(subtasks.length, summary.getAlignmentDurationStats().getCount()); } + + @Test + public void testIsJavaSerializable() throws Exception { + JobVertexID jobVertexId = new JobVertexID(); + SubtaskStateStats[] subtasks = new SubtaskStateStats[7]; + + TaskStateStats taskStats = new TaskStateStats(jobVertexId, subtasks.length); + + long stateSize = 0; + long alignmentBuffered = 0; + + for (int i = 0; i < subtasks.length; i++) { + subtasks[i] = new SubtaskStateStats( + i, + rand.nextInt(128), + rand.nextInt(128), + rand.nextInt(128), + rand.nextInt(128), + rand.nextInt(128), + rand.nextInt(128)); + + stateSize += subtasks[i].getStateSize(); + alignmentBuffered += subtasks[i].getAlignmentBuffered(); + + taskStats.reportSubtaskStats(subtasks[i]); + } + + TaskStateStats copy = CommonTestUtils.createCopySerializable(taskStats); + + assertEquals(stateSize, copy.getStateSize()); + assertEquals(alignmentBuffered, copy.getAlignmentBuffered()); + + TaskStateStats.TaskStateStatsSummary summary = copy.getSummaryStats(); + assertEquals(subtasks.length, summary.getStateSizeStats().getCount()); + assertEquals(subtasks.length, summary.getAckTimestampStats().getCount()); + assertEquals(subtasks.length, summary.getSyncCheckpointDurationStats().getCount()); + assertEquals(subtasks.length, summary.getAsyncCheckpointDurationStats().getCount()); + assertEquals(subtasks.length, summary.getAlignmentBufferedStats().getCount()); + assertEquals(subtasks.length, summary.getAlignmentDurationStats().getCount()); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 6d7427a..9b1064d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -172,11 +172,8 @@ public class ArchivedExecutionGraphTest { // ------------------------------------------------------------------------------------------------------------- // CheckpointStats // ------------------------------------------------------------------------------------------------------------- - CheckpointStatsTracker runtimeStats = runtimeGraph.getCheckpointStatsTracker(); - CheckpointStatsTracker archivedStats = archivedGraph.getCheckpointStatsTracker(); - - CheckpointStatsSnapshot runtimeSnapshot = runtimeStats.createSnapshot(); - CheckpointStatsSnapshot archivedSnapshot = archivedStats.createSnapshot(); + CheckpointStatsSnapshot runtimeSnapshot = runtimeGraph.getCheckpointStatsSnapshot(); + CheckpointStatsSnapshot archivedSnapshot = archivedGraph.getCheckpointStatsSnapshot(); assertEquals(runtimeSnapshot.getSummaryStats().getEndToEndDurationStats().getAverage(), archivedSnapshot.getSummaryStats().getEndToEndDurationStats().getAverage()); assertEquals(runtimeSnapshot.getSummaryStats().getEndToEndDurationStats().getMinimum(), archivedSnapshot.getSummaryStats().getEndToEndDurationStats().getMinimum()); http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java new file mode 100644 index 0000000..667dbca --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.tasks; + +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.junit.Test; + +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; + +public class JobSnapshottingSettingsTest { + + /** + * Tests that the settings are actually serializable. + */ + @Test + public void testIsJavaSerializable() throws Exception { + JobSnapshottingSettings settings = new JobSnapshottingSettings( + Arrays.asList(new JobVertexID(), new JobVertexID()), + Arrays.asList(new JobVertexID(), new JobVertexID()), + Arrays.asList(new JobVertexID(), new JobVertexID()), + 1231231, + 1231, + 112, + 12, + ExternalizedCheckpointSettings.externalizeCheckpoints(true), + false); + + JobSnapshottingSettings copy = CommonTestUtils.createCopySerializable(settings); + assertEquals(settings.getVerticesToAcknowledge(), copy.getVerticesToAcknowledge()); + assertEquals(settings.getVerticesToConfirm(), copy.getVerticesToConfirm()); + assertEquals(settings.getVerticesToTrigger(), copy.getVerticesToTrigger()); + assertEquals(settings.getCheckpointInterval(), copy.getCheckpointInterval()); + assertEquals(settings.getCheckpointTimeout(), copy.getCheckpointTimeout()); + assertEquals(settings.getMinPauseBetweenCheckpoints(), copy.getMinPauseBetweenCheckpoints()); + assertEquals(settings.getMaxConcurrentCheckpoints(), copy.getMaxConcurrentCheckpoints()); + assertEquals(settings.getExternalizedCheckpointSettings().externalizeCheckpoints(), copy.getExternalizedCheckpointSettings().externalizeCheckpoints()); + assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(), copy.getExternalizedCheckpointSettings().deleteOnCancellation()); + assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce()); + } +}