Repository: flink
Updated Branches:
  refs/heads/master 126fb1779 -> dcfa3fbb0


[FLINK-5628] [webfrontend] Fix serializability of checkpoint stats tracker

This closes #3215.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dcfa3fbb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dcfa3fbb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dcfa3fbb

Branch: refs/heads/master
Commit: dcfa3fbb0f17400ebf823e10f803cde8563fff4a
Parents: 126fb17
Author: Ufuk Celebi <u...@apache.org>
Authored: Wed Jan 25 15:42:24 2017 +0100
Committer: Ufuk Celebi <u...@apache.org>
Committed: Mon Jan 30 16:33:22 2017 +0100

----------------------------------------------------------------------
 .../checkpoints/CheckpointConfigHandler.java    |  8 +-
 .../CheckpointStatsDetailsHandler.java          |  7 +-
 .../CheckpointStatsDetailsSubtasksHandler.java  |  7 +-
 .../checkpoints/CheckpointStatsHandler.java     |  7 +-
 .../CheckpointConfigHandlerTest.java            | 13 +--
 .../CheckpointStatsDetailsHandlerTest.java      | 13 +--
 .../checkpoints/CheckpointStatsHandlerTest.java |  5 +-
 ...heckpointStatsSubtaskDetailsHandlerTest.java | 13 +--
 .../checkpoint/AbstractCheckpointStats.java     |  5 +-
 .../checkpoint/CheckpointStatsHistory.java      |  4 +-
 .../checkpoint/CheckpointStatsTracker.java      | 16 ++--
 .../checkpoint/CompletedCheckpointStats.java    | 26 +++---
 .../checkpoint/FailedCheckpointStats.java       | 24 +++---
 .../checkpoint/PendingCheckpointStats.java      |  4 +-
 .../checkpoint/RestoredCheckpointStats.java     |  2 +-
 .../runtime/checkpoint/SubtaskStateStats.java   |  8 +-
 .../runtime/checkpoint/TaskStateStats.java      |  9 ++-
 .../executiongraph/AccessExecutionGraph.java    | 20 +++--
 .../executiongraph/ArchivedExecutionGraph.java  | 54 ++++++++-----
 .../runtime/executiongraph/ExecutionGraph.java  | 22 ++++-
 .../tasks/ExternalizedCheckpointSettings.java   |  2 +
 .../checkpoint/CheckpointStatsHistoryTest.java  |  1 -
 .../checkpoint/CheckpointStatsSnapshotTest.java | 84 ++++++++++++++++++++
 .../checkpoint/CompletedCheckpointTest.java     | 35 ++++++++
 .../checkpoint/FailedCheckpointStatsTest.java   | 40 ++++++++++
 .../checkpoint/PendingCheckpointStatsTest.java  | 38 +++++++--
 .../checkpoint/SubtaskStateStatsTest.java       | 36 +++++++++
 .../runtime/checkpoint/TaskStateStatsTest.java  | 46 ++++++++++-
 .../ArchivedExecutionGraphTest.java             |  7 +-
 .../tasks/JobSnapshottingSettingsTest.java      | 59 ++++++++++++++
 30 files changed, 490 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index 1ad5e65..be0d283 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
@@ -42,10 +41,13 @@ public class CheckpointConfigHandler extends 
AbstractExecutionGraphRequestHandle
        @Override
        public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
                StringWriter writer = new StringWriter();
+
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+               JobSnapshottingSettings settings = 
graph.getJobSnapshottingSettings();
 
-               CheckpointStatsTracker tracker = 
graph.getCheckpointStatsTracker();
-               JobSnapshottingSettings settings = 
tracker.getSnapshottingSettings();
+               if (settings == null) {
+                       return "{}";
+               }
 
                gen.writeStartObject();
                {

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index 6bb8300..33d6cf7 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
@@ -54,8 +53,10 @@ public class CheckpointStatsDetailsHandler extends 
AbstractExecutionGraphRequest
                        return "{}";
                }
 
-               CheckpointStatsTracker tracker = 
graph.getCheckpointStatsTracker();
-               CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
+               CheckpointStatsSnapshot snapshot = 
graph.getCheckpointStatsSnapshot();
+               if (snapshot == null) {
+                       return "{}";
+               }
 
                AbstractCheckpointStats checkpoint = 
snapshot.getHistory().getCheckpointById(checkpointId);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index 00195b5..d55467f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -21,7 +21,6 @@ package 
org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
@@ -72,8 +71,10 @@ public class CheckpointStatsDetailsSubtasksHandler extends 
AbstractExecutionGrap
                        return "{}";
                }
 
-               CheckpointStatsTracker tracker = 
graph.getCheckpointStatsTracker();
-               CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
+               CheckpointStatsSnapshot snapshot = 
graph.getCheckpointStatsSnapshot();
+               if (snapshot == null) {
+                       return "{}";
+               }
 
                AbstractCheckpointStats checkpoint = 
snapshot.getHistory().getCheckpointById(checkpointId);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index 71f3637..8aab5fa 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
@@ -54,8 +53,10 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
 
-               CheckpointStatsTracker tracker = 
graph.getCheckpointStatsTracker();
-               CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
+               CheckpointStatsSnapshot snapshot = 
graph.getCheckpointStatsSnapshot();
+               if (snapshot == null) {
+                       return "{}";
+               }
 
                gen.writeStartObject();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
index 410e044..e517c3c 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
@@ -20,7 +20,6 @@ package 
org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
@@ -60,9 +59,7 @@ public class CheckpointConfigHandlerTest {
                        true);
 
                AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               CheckpointStatsTracker tracker = 
mock(CheckpointStatsTracker.class);
-               when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-               when(tracker.getSnapshottingSettings()).thenReturn(settings);
+               when(graph.getJobSnapshottingSettings()).thenReturn(settings);
 
                CheckpointConfigHandler handler = new 
CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
                String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap());
@@ -98,9 +95,7 @@ public class CheckpointConfigHandlerTest {
                        false); // at least once
 
                AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               CheckpointStatsTracker tracker = 
mock(CheckpointStatsTracker.class);
-               when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-               when(tracker.getSnapshottingSettings()).thenReturn(settings);
+               when(graph.getJobSnapshottingSettings()).thenReturn(settings);
 
                CheckpointConfigHandler handler = new 
CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
                String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap());
@@ -130,9 +125,7 @@ public class CheckpointConfigHandlerTest {
                        false); // at least once
 
                AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               CheckpointStatsTracker tracker = 
mock(CheckpointStatsTracker.class);
-               when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-               when(tracker.getSnapshottingSettings()).thenReturn(settings);
+               when(graph.getJobSnapshottingSettings()).thenReturn(settings);
 
                CheckpointConfigHandler handler = new 
CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
                String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap());

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
index 17c8558..fb5cfc5 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
@@ -89,9 +88,7 @@ public class CheckpointStatsDetailsHandlerTest {
                when(snapshot.getHistory()).thenReturn(history);
 
                AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               CheckpointStatsTracker tracker = 
mock(CheckpointStatsTracker.class);
-               when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-               when(tracker.createSnapshot()).thenReturn(snapshot);
+               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
                CheckpointStatsDetailsHandler handler = new 
CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new 
CheckpointStatsCache(0));
                Map<String, String> params = new HashMap<>();
@@ -238,16 +235,14 @@ public class CheckpointStatsDetailsHandlerTest {
 
        // 
------------------------------------------------------------------------
 
-       static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) 
throws Exception {
+       private static JsonNode triggerRequest(AbstractCheckpointStats 
checkpoint) throws Exception {
                CheckpointStatsHistory history = 
mock(CheckpointStatsHistory.class);
                
when(history.getCheckpointById(anyLong())).thenReturn(checkpoint);
                CheckpointStatsSnapshot snapshot = 
mock(CheckpointStatsSnapshot.class);
                when(snapshot.getHistory()).thenReturn(history);
 
                AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               CheckpointStatsTracker tracker = 
mock(CheckpointStatsTracker.class);
-               when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-               when(tracker.createSnapshot()).thenReturn(snapshot);
+               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
                CheckpointStatsDetailsHandler handler = new 
CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new 
CheckpointStatsCache(0));
                Map<String, String> params = new HashMap<>();
@@ -258,7 +253,7 @@ public class CheckpointStatsDetailsHandlerTest {
                return mapper.readTree(json);
        }
 
-       static void verifyTaskNode(TaskStateStats task, JsonNode parentNode) {
+       private static void verifyTaskNode(TaskStateStats task, JsonNode 
parentNode) {
                long duration = ThreadLocalRandom.current().nextInt(128);
 
                JsonNode taskNode = 
parentNode.get("tasks").get(task.getJobVertexId().toString());

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
index 8274b36..23a1900 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
@@ -179,9 +178,7 @@ public class CheckpointStatsHandlerTest {
                
when(snapshot.getLatestRestoredCheckpoint()).thenReturn(latestRestored);
 
                AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               CheckpointStatsTracker tracker = 
mock(CheckpointStatsTracker.class);
-               when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-               when(tracker.createSnapshot()).thenReturn(snapshot);
+               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
                CheckpointStatsHandler handler = new 
CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
                String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap());

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
index 8b7201d..571adad 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
 import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
@@ -129,9 +128,7 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
                when(snapshot.getHistory()).thenReturn(history);
 
                AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               CheckpointStatsTracker tracker = 
mock(CheckpointStatsTracker.class);
-               when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-               when(tracker.createSnapshot()).thenReturn(snapshot);
+               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
                CheckpointStatsDetailsSubtasksHandler handler = new 
CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new 
CheckpointStatsCache(0));
                Map<String, String> params = new HashMap<>();
@@ -186,9 +183,7 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
                when(snapshot.getHistory()).thenReturn(history);
 
                AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               CheckpointStatsTracker tracker = 
mock(CheckpointStatsTracker.class);
-               when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-               when(tracker.createSnapshot()).thenReturn(snapshot);
+               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
                CheckpointStatsDetailsSubtasksHandler handler = new 
CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new 
CheckpointStatsCache(0));
                Map<String, String> params = new HashMap<>();
@@ -209,9 +204,7 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
                when(snapshot.getHistory()).thenReturn(history);
 
                AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               CheckpointStatsTracker tracker = 
mock(CheckpointStatsTracker.class);
-               when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-               when(tracker.createSnapshot()).thenReturn(snapshot);
+               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
                CheckpointStatsDetailsSubtasksHandler handler = new 
CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new 
CheckpointStatsCache(0));
                Map<String, String> params = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
index 6c261a5..5b3c7c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import javax.annotation.Nullable;
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
 
@@ -30,7 +31,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Base class for checkpoint statistics.
  */
-public abstract class AbstractCheckpointStats {
+public abstract class AbstractCheckpointStats implements Serializable {
+
+       private static final long serialVersionUID = 1041218202028265151L;
 
        /** ID of this checkpoint. */
        final long checkpointId;

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
index 56fc9c1..13ce642 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
@@ -307,7 +307,9 @@ public class CheckpointStatsHistory implements Serializable 
{
         *
         * <p>The iteration order is in reverse insertion order.
         */
-       private static class CheckpointsStatsHistoryIterable implements 
Iterable<AbstractCheckpointStats> {
+       private static class CheckpointsStatsHistoryIterable implements 
Iterable<AbstractCheckpointStats>, Serializable {
+
+               private static final long serialVersionUID = 
726376482426055490L;
 
                /** Copy of the checkpointsArray array at the point when this 
iterable was created. */
                private final AbstractCheckpointStats[] checkpointsArray;

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index 92f707f..d324c25 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 
 import javax.annotation.Nullable;
-import java.io.Serializable;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantLock;
@@ -52,9 +51,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * <p>The statistics are accessed via {@link #createSnapshot()} and exposed via
  * both the web frontend and the {@link Metric} system.
  */
-public class CheckpointStatsTracker implements Serializable {
-
-       private static final long serialVersionUID = 1694085244807339288L;
+public class CheckpointStatsTracker {
 
        /**
         * Lock used to update stats and creating snapshots. Updates always 
happen
@@ -67,9 +64,6 @@ public class CheckpointStatsTracker implements Serializable {
         */
        private final ReentrantLock statsReadWriteLock = new ReentrantLock();
 
-       /** The job vertices taking part in the checkpoints. */
-       private final List<ExecutionJobVertex> jobVertices;
-
        /** Total number of subtasks to checkpoint. */
        private final int totalSubtaskCount;
 
@@ -85,6 +79,9 @@ public class CheckpointStatsTracker implements Serializable {
        /** History of checkpoints. */
        private final CheckpointStatsHistory history;
 
+       /** The job vertices taking part in the checkpoints. */
+       private final transient List<ExecutionJobVertex> jobVertices;
+
        /** The latest restored checkpoint. */
        @Nullable
        private RestoredCheckpointStats latestRestoredCheckpoint;
@@ -217,6 +214,11 @@ public class CheckpointStatsTracker implements 
Serializable {
                return pending;
        }
 
+       /**
+        * Callback when a checkpoint is restored.
+        *
+        * @param restored The restored checkpoint stats.
+        */
        void reportRestoredCheckpoint(RestoredCheckpointStats restored) {
                checkNotNull(restored, "Restored checkpoint");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
index 4d2d995..f6b6aed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
@@ -35,8 +35,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class CompletedCheckpointStats extends AbstractCheckpointStats {
 
-       /** Callback for the {@link CompletedCheckpoint} instance to notify 
about discard. */
-       private final DiscardCallback discardCallback;
+       private static final long serialVersionUID = 138833868551861343L;
 
        /** Total checkpoint state size over all subtasks. */
        private final long stateSize;
@@ -69,16 +68,16 @@ public class CompletedCheckpointStats extends 
AbstractCheckpointStats {
         * @param externalPath Optional external path if persisted externally.
         */
        CompletedCheckpointStats(
-               long checkpointId,
-               long triggerTimestamp,
-               CheckpointProperties props,
-               int totalSubtaskCount,
-               Map<JobVertexID, TaskStateStats> taskStats,
-               int numAcknowledgedSubtasks,
-               long stateSize,
-               long alignmentBuffered,
-               SubtaskStateStats latestAcknowledgedSubtask,
-               @Nullable String externalPath) {
+                       long checkpointId,
+                       long triggerTimestamp,
+                       CheckpointProperties props,
+                       int totalSubtaskCount,
+                       Map<JobVertexID, TaskStateStats> taskStats,
+                       int numAcknowledgedSubtasks,
+                       long stateSize,
+                       long alignmentBuffered,
+                       SubtaskStateStats latestAcknowledgedSubtask,
+                       @Nullable String externalPath) {
 
                super(checkpointId, triggerTimestamp, props, totalSubtaskCount, 
taskStats);
                checkArgument(numAcknowledgedSubtasks == totalSubtaskCount, 
"Did not acknowledge all subtasks.");
@@ -87,7 +86,6 @@ public class CompletedCheckpointStats extends 
AbstractCheckpointStats {
                this.alignmentBuffered = alignmentBuffered;
                this.latestAcknowledgedSubtask = 
checkNotNull(latestAcknowledgedSubtask);
                this.externalPath = externalPath;
-               this.discardCallback = new DiscardCallback();
        }
 
        @Override
@@ -145,7 +143,7 @@ public class CompletedCheckpointStats extends 
AbstractCheckpointStats {
         * @return Callback for the {@link CompletedCheckpoint}.
         */
        DiscardCallback getDiscardCallback() {
-               return discardCallback;
+               return new DiscardCallback();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
index 83d7c3d..2f596a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
@@ -32,6 +32,8 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  */
 public class FailedCheckpointStats extends AbstractCheckpointStats {
 
+       private static final long serialVersionUID = 8000748529515900106L;
+
        /** Number of acknowledged tasks. */
        private final int numAcknowledgedSubtasks;
 
@@ -71,17 +73,17 @@ public class FailedCheckpointStats extends 
AbstractCheckpointStats {
         * @param cause Cause of the checkpoint failure or <code>null</code>.
         */
        FailedCheckpointStats(
-               long checkpointId,
-               long triggerTimestamp,
-               CheckpointProperties props,
-               int totalSubtaskCount,
-               Map<JobVertexID, TaskStateStats> taskStats,
-               int numAcknowledgedSubtasks,
-               long stateSize,
-               long alignmentBuffered,
-               long failureTimestamp,
-               @Nullable SubtaskStateStats latestAcknowledgedSubtask,
-               @Nullable Throwable cause) {
+                       long checkpointId,
+                       long triggerTimestamp,
+                       CheckpointProperties props,
+                       int totalSubtaskCount,
+                       Map<JobVertexID, TaskStateStats> taskStats,
+                       int numAcknowledgedSubtasks,
+                       long stateSize,
+                       long alignmentBuffered,
+                       long failureTimestamp,
+                       @Nullable SubtaskStateStats latestAcknowledgedSubtask,
+                       @Nullable Throwable cause) {
 
                super(checkpointId, triggerTimestamp, props, totalSubtaskCount, 
taskStats);
                checkArgument(numAcknowledgedSubtasks >= 0, "Negative number of 
ACKs");

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
index e6fa80f..0f32250 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
@@ -42,8 +42,10 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class PendingCheckpointStats extends AbstractCheckpointStats {
 
+       private static final long serialVersionUID = -973959257699390327L;
+
        /** Tracker callback when the pending checkpoint is finalized or 
aborted. */
-       private final CheckpointStatsTracker.PendingCheckpointStatsCallback 
trackerCallback;
+       private transient final 
CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback;
 
        /** The current number of acknowledged subtasks. */
        private volatile int currentNumAcknowledgedSubtasks;

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
index c21937a..8b8a5e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
@@ -56,7 +56,7 @@ public class RestoredCheckpointStats implements Serializable {
                        long checkpointId,
                        CheckpointProperties props,
                        long restoreTimestamp,
-                       String externalPath) {
+                       @Nullable String externalPath) {
 
                this.checkpointId = checkpointId;
                this.props = checkNotNull(props, "Checkpoint Properties");

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
index 3a66032..ee9e287 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import java.io.Serializable;
+
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
@@ -30,8 +32,10 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  *
  * <p>This is the smallest immutable unit of the stats.
  */
-public class SubtaskStateStats {
-       
+public class SubtaskStateStats implements Serializable {
+
+       private static final long serialVersionUID = 8928594531621862214L;
+
        /** Index of this sub task. */
        private final int subtaskIndex;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
index fc118d9..2f779a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import javax.annotation.Nullable;
+import java.io.Serializable;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -29,7 +30,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * Statistics for a single task/operator that gathers all statistics of its
  * subtasks and provides summary statistics about all subtasks.
  */
-public class TaskStateStats {
+public class TaskStateStats implements Serializable {
+
+       private static final long serialVersionUID = 531803101206574444L;
 
        /** ID of the task the stats belong to. */
        private final JobVertexID jobVertexId;
@@ -195,7 +198,9 @@ public class TaskStateStats {
        /**
         * Summary of the subtask stats of a single task/operator.
         */
-       public static class TaskStateStatsSummary {
+       public static class TaskStateStatsSummary implements Serializable {
+
+               private static final long serialVersionUID = 
1009476026522091909L;
 
                private MinMaxAvgStats stateSize = new MinMaxAvgStats();
                private MinMaxAvgStats ackTimestamp = new MinMaxAvgStats();

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 3490dc8..18c2ec2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -17,13 +17,14 @@
  */
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
-import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
@@ -115,11 +116,20 @@ public interface AccessExecutionGraph {
        CheckpointCoordinator getCheckpointCoordinator();
 
        /**
-        * Returns the {@link CheckpointStatsTracker} for this execution graph.
+        * Returns the {@link JobSnapshottingSettings} or <code>null</code> if
+        * checkpointing is disabled.
+        *
+        * @return JobSnapshottingSettings for this execution graph
+        */
+       JobSnapshottingSettings getJobSnapshottingSettings();
+
+       /**
+        * Returns a snapshot of the checkpoint statistics or <code>null</code> 
if
+        * checkpointing is disabled.
         *
-        * @return CheckpointStatsTracker for thie execution graph
+        * @return Snapshot of the checkpoint statistics for this execution 
graph
         */
-       CheckpointStatsTracker getCheckpointStatsTracker();
+       CheckpointStatsSnapshot getCheckpointStatsSnapshot();
 
        /**
         * Returns the {@link ArchivedExecutionConfig} for this execution graph.

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 440ecda..334b0d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -21,11 +21,13 @@ import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.util.SerializedValue;
 
+import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.Iterator;
@@ -34,6 +36,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 
 public class ArchivedExecutionGraph implements AccessExecutionGraph, 
Serializable {
+
        private static final long serialVersionUID = 7231383912742578428L;
        // 
--------------------------------------------------------------------------------------------
 
@@ -76,23 +79,29 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
        private final ArchivedExecutionConfig archivedExecutionConfig;
        private final boolean isStoppable;
        private final Map<String, SerializedValue<Object>> 
serializedUserAccumulators;
-       private final CheckpointStatsTracker tracker;
+
+       @Nullable
+       private final JobSnapshottingSettings jobSnapshottingSettings;
+
+       @Nullable
+       private final CheckpointStatsSnapshot checkpointStatsSnapshot;
 
        public ArchivedExecutionGraph(
-               JobID jobID,
-               String jobName,
-               Map<JobVertexID, ArchivedExecutionJobVertex> tasks,
-               List<ArchivedExecutionJobVertex> verticesInCreationOrder,
-               long[] stateTimestamps,
-               JobStatus state,
-               String failureCause,
-               String jsonPlan,
-               StringifiedAccumulatorResult[] archivedUserAccumulators,
-               Map<String, SerializedValue<Object>> serializedUserAccumulators,
-               ArchivedExecutionConfig executionConfig,
-               boolean isStoppable,
-               CheckpointStatsTracker tracker
-       ) {
+                       JobID jobID,
+                       String jobName,
+                       Map<JobVertexID, ArchivedExecutionJobVertex> tasks,
+                       List<ArchivedExecutionJobVertex> 
verticesInCreationOrder,
+                       long[] stateTimestamps,
+                       JobStatus state,
+                       String failureCause,
+                       String jsonPlan,
+                       StringifiedAccumulatorResult[] archivedUserAccumulators,
+                       Map<String, SerializedValue<Object>> 
serializedUserAccumulators,
+                       ArchivedExecutionConfig executionConfig,
+                       boolean isStoppable,
+                       @Nullable JobSnapshottingSettings 
jobSnapshottingSettings,
+                       @Nullable CheckpointStatsSnapshot 
checkpointStatsSnapshot) {
+
                this.jobID = jobID;
                this.jobName = jobName;
                this.tasks = tasks;
@@ -105,10 +114,12 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
                this.serializedUserAccumulators = serializedUserAccumulators;
                this.archivedExecutionConfig = executionConfig;
                this.isStoppable = isStoppable;
-               this.tracker = tracker;
+               this.jobSnapshottingSettings = jobSnapshottingSettings;
+               this.checkpointStatsSnapshot = checkpointStatsSnapshot;
        }
 
        // 
--------------------------------------------------------------------------------------------
+
        @Override
        public String getJsonPlan() {
                return jsonPlan;
@@ -200,8 +211,13 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
        }
 
        @Override
-       public CheckpointStatsTracker getCheckpointStatsTracker() {
-               return tracker;
+       public JobSnapshottingSettings getJobSnapshottingSettings() {
+               return jobSnapshottingSettings;
+       }
+
+       @Override
+       public CheckpointStatsSnapshot getCheckpointStatsSnapshot() {
+               return checkpointStatsSnapshot;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 2069638..da4a66e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -50,6 +51,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -425,8 +427,21 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
        }
 
        @Override
-       public CheckpointStatsTracker getCheckpointStatsTracker() {
-               return checkpointStatsTracker;
+       public JobSnapshottingSettings getJobSnapshottingSettings() {
+               if (checkpointStatsTracker != null) {
+                       return checkpointStatsTracker.getSnapshottingSettings();
+               } else {
+                       return null;
+               }
+       }
+
+       @Override
+       public CheckpointStatsSnapshot getCheckpointStatsSnapshot() {
+               if (checkpointStatsTracker != null) {
+                       return checkpointStatsTracker.createSnapshot();
+               } else {
+                       return null;
+               }
        }
 
        private ExecutionVertex[] 
collectExecutionVertices(List<ExecutionJobVertex> jobVertices) {
@@ -1338,6 +1353,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        serializedUserAccumulators,
                        getArchivedExecutionConfig(),
                        isStoppable(),
-                       getCheckpointStatsTracker());
+                       getJobSnapshottingSettings(),
+                       getCheckpointStatsSnapshot());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
index 779fc76..b3b487c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
@@ -26,6 +26,8 @@ import org.apache.flink.annotation.Internal;
 @Internal
 public class ExternalizedCheckpointSettings implements java.io.Serializable {
 
+       private static final long serialVersionUID = -6271691851124392955L;
+
        private static final ExternalizedCheckpointSettings NONE = new 
ExternalizedCheckpointSettings(false, false);
 
        /** Flag indicating whether checkpoints should be externalized. */

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
index 098fe17..7541806 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
@@ -192,5 +192,4 @@ public class CheckpointStatsHistoryTest {
                return failed;
        }
 
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java
new file mode 100644
index 0000000..6500369
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CheckpointStatsSnapshotTest {
+
+       /**
+        * Tests that the snapshot is actually serializable.
+        */
+       @Test
+       public void testIsJavaSerializable() throws Exception {
+               CheckpointStatsCounts counts = new CheckpointStatsCounts();
+               counts.incrementInProgressCheckpoints();
+               counts.incrementInProgressCheckpoints();
+               counts.incrementInProgressCheckpoints();
+               counts.incrementCompletedCheckpoints();
+               counts.incrementFailedCheckpoints();
+               counts.incrementRestoredCheckpoints();
+
+               CompletedCheckpointStatsSummary summary = new 
CompletedCheckpointStatsSummary();
+               summary.updateSummary(createCompletedCheckpointsStats(12398, 
9919, 12));
+               summary.updateSummary(createCompletedCheckpointsStats(2221, 
3333, 9122));
+
+               CheckpointStatsHistory history = new CheckpointStatsHistory(1);
+               RestoredCheckpointStats restored = new 
RestoredCheckpointStats(1, CheckpointProperties.forStandardCheckpoint(), 99119, 
null);
+
+               CheckpointStatsSnapshot snapshot = new CheckpointStatsSnapshot(
+                       counts,
+                       summary,
+                       history,
+                       restored);
+
+               CheckpointStatsSnapshot copy = 
CommonTestUtils.createCopySerializable(snapshot);
+
+               assertEquals(counts.getNumberOfCompletedCheckpoints(), 
copy.getCounts().getNumberOfCompletedCheckpoints());
+               assertEquals(counts.getNumberOfFailedCheckpoints(), 
copy.getCounts().getNumberOfFailedCheckpoints());
+               assertEquals(counts.getNumberOfInProgressCheckpoints(), 
copy.getCounts().getNumberOfInProgressCheckpoints());
+               assertEquals(counts.getNumberOfRestoredCheckpoints(), 
copy.getCounts().getNumberOfRestoredCheckpoints());
+               assertEquals(counts.getTotalNumberOfCheckpoints(), 
copy.getCounts().getTotalNumberOfCheckpoints());
+
+               assertEquals(summary.getStateSizeStats().getSum(), 
copy.getSummaryStats().getStateSizeStats().getSum());
+               assertEquals(summary.getEndToEndDurationStats().getSum(), 
copy.getSummaryStats().getEndToEndDurationStats().getSum());
+               assertEquals(summary.getAlignmentBufferedStats().getSum(), 
copy.getSummaryStats().getAlignmentBufferedStats().getSum());
+
+               assertEquals(restored.getCheckpointId(), 
copy.getLatestRestoredCheckpoint().getCheckpointId());
+       }
+
+       private CompletedCheckpointStats createCompletedCheckpointsStats(
+                       long stateSize,
+                       long endToEndDuration,
+                       long alignmentBuffered) {
+
+               CompletedCheckpointStats completed = 
mock(CompletedCheckpointStats.class);
+               when(completed.getStateSize()).thenReturn(stateSize);
+               
when(completed.getEndToEndDuration()).thenReturn(endToEndDuration);
+               
when(completed.getAlignmentBuffered()).thenReturn(alignmentBuffered);
+
+               return completed;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index e466dc7..0d933ff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.junit.Rule;
@@ -143,4 +144,38 @@ public class CompletedCheckpointTest {
                completed.discard(JobStatus.FINISHED);
                verify(callback, times(1)).notifyDiscardedCheckpoint();
        }
+
+       @Test
+       public void testIsJavaSerializable() throws Exception {
+               TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3);
+               TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
+
+               HashMap<JobVertexID, TaskStateStats> taskStats = new 
HashMap<>();
+               taskStats.put(task1.getJobVertexId(), task1);
+               taskStats.put(task2.getJobVertexId(), task2);
+
+               CompletedCheckpointStats completed = new 
CompletedCheckpointStats(
+                       123123123L,
+                       10123L,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       1337,
+                       taskStats,
+                       1337,
+                       123129837912L,
+                       123819239812L,
+                       new SubtaskStateStats(123, 213123, 123123, 0, 0, 0, 0),
+                       null);
+
+               CompletedCheckpointStats copy = 
CommonTestUtils.createCopySerializable(completed);
+
+               assertEquals(completed.getCheckpointId(), 
copy.getCheckpointId());
+               assertEquals(completed.getTriggerTimestamp(), 
copy.getTriggerTimestamp());
+               assertEquals(completed.getProperties(), copy.getProperties());
+               assertEquals(completed.getNumberOfSubtasks(), 
copy.getNumberOfSubtasks());
+               assertEquals(completed.getNumberOfAcknowledgedSubtasks(), 
copy.getNumberOfAcknowledgedSubtasks());
+               assertEquals(completed.getEndToEndDuration(), 
copy.getEndToEndDuration());
+               assertEquals(completed.getStateSize(), copy.getStateSize());
+               
assertEquals(completed.getLatestAcknowledgedSubtaskStats().getSubtaskIndex(), 
copy.getLatestAcknowledgedSubtaskStats().getSubtaskIndex());
+               assertEquals(completed.getStatus(), copy.getStatus());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
index 683c1c9..f1a56be 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.junit.Test;
 
+import java.io.NotSerializableException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -57,4 +59,42 @@ public class FailedCheckpointStatsTest {
 
                assertEquals(duration, failed.getEndToEndDuration());
        }
+
+       @Test
+       public void testIsJavaSerializable() throws Exception {
+               long duration = 123912931293L;
+               long triggerTimestamp = 10123;
+               long failureTimestamp = triggerTimestamp + duration;
+
+               Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+               JobVertexID jobVertexId = new JobVertexID();
+               taskStats.put(jobVertexId, new TaskStateStats(jobVertexId, 1));
+
+               FailedCheckpointStats failed = new FailedCheckpointStats(
+                       123123123L,
+                       triggerTimestamp,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       1337,
+                       taskStats,
+                       3,
+                       190890123,
+                       0,
+                       failureTimestamp,
+                       null,
+                       new NotSerializableException("message"));
+
+               FailedCheckpointStats copy = 
CommonTestUtils.createCopySerializable(failed);
+
+               assertEquals(failed.getCheckpointId(), copy.getCheckpointId());
+               assertEquals(failed.getTriggerTimestamp(), 
copy.getTriggerTimestamp());
+               assertEquals(failed.getProperties(), copy.getProperties());
+               assertEquals(failed.getNumberOfSubtasks(), 
copy.getNumberOfSubtasks());
+               assertEquals(failed.getNumberOfAcknowledgedSubtasks(), 
copy.getNumberOfAcknowledgedSubtasks());
+               assertEquals(failed.getEndToEndDuration(), 
copy.getEndToEndDuration());
+               assertEquals(failed.getStateSize(), copy.getStateSize());
+               assertEquals(failed.getLatestAcknowledgedSubtaskStats(), 
copy.getLatestAcknowledgedSubtaskStats());
+               assertEquals(failed.getStatus(), copy.getStatus());
+               assertEquals(failed.getFailureMessage(), 
copy.getFailureMessage());
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
index 854e106..6c5e8fd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import java.util.HashMap;
-import java.util.Map;
 
 import static junit.framework.TestCase.assertFalse;
 import static org.junit.Assert.assertEquals;
@@ -47,7 +47,7 @@ public class PendingCheckpointStatsTest {
                TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
                int totalSubtaskCount = task1.getNumberOfSubtasks() + 
task2.getNumberOfSubtasks();
 
-               Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+               HashMap<JobVertexID, TaskStateStats> taskStats = new 
HashMap<>();
                taskStats.put(task1.getJobVertexId(), task1);
                taskStats.put(task2.getJobVertexId(), task2);
 
@@ -128,7 +128,7 @@ public class PendingCheckpointStatsTest {
                TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3);
                TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
 
-               Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+               HashMap<JobVertexID, TaskStateStats> taskStats = new 
HashMap<>();
                taskStats.put(task1.getJobVertexId(), task1);
                taskStats.put(task2.getJobVertexId(), task2);
 
@@ -165,7 +165,6 @@ public class PendingCheckpointStatsTest {
                assertNotNull(completed);
                assertEquals(CheckpointStatsStatus.COMPLETED, 
completed.getStatus());
                assertFalse(completed.isDiscarded());
-               assertEquals(discardCallback, completed.getDiscardCallback());
                discardCallback.notifyDiscardedCheckpoint();
                assertTrue(completed.isDiscarded());
                assertEquals(externalPath, completed.getExternalPath());
@@ -189,7 +188,7 @@ public class PendingCheckpointStatsTest {
                TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3);
                TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
 
-               Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+               HashMap<JobVertexID, TaskStateStats> taskStats = new 
HashMap<>();
                taskStats.put(task1.getJobVertexId(), task1);
                taskStats.put(task2.getJobVertexId(), task2);
 
@@ -240,6 +239,35 @@ public class PendingCheckpointStatsTest {
                assertEquals(task2, 
failed.getTaskStateStats(task2.getJobVertexId()));
        }
 
+       @Test
+       public void testIsJavaSerializable() throws Exception {
+               TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3);
+               TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
+
+               HashMap<JobVertexID, TaskStateStats> taskStats = new 
HashMap<>();
+               taskStats.put(task1.getJobVertexId(), task1);
+               taskStats.put(task2.getJobVertexId(), task2);
+
+               PendingCheckpointStats pending = new PendingCheckpointStats(
+                       123123123L,
+                       10123L,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       1337,
+                       taskStats,
+                       
mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class));
+
+               PendingCheckpointStats copy = 
CommonTestUtils.createCopySerializable(pending);
+
+               assertEquals(pending.getCheckpointId(), copy.getCheckpointId());
+               assertEquals(pending.getTriggerTimestamp(), 
copy.getTriggerTimestamp());
+               assertEquals(pending.getProperties(), copy.getProperties());
+               assertEquals(pending.getNumberOfSubtasks(), 
copy.getNumberOfSubtasks());
+               assertEquals(pending.getNumberOfAcknowledgedSubtasks(), 
copy.getNumberOfAcknowledgedSubtasks());
+               assertEquals(pending.getEndToEndDuration(), 
copy.getEndToEndDuration());
+               assertEquals(pending.getStateSize(), copy.getStateSize());
+               assertEquals(pending.getLatestAcknowledgedSubtaskStats(), 
copy.getLatestAcknowledgedSubtaskStats());
+               assertEquals(pending.getStatus(), copy.getStatus());
+       }
 
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java
index 75c40c5..8514f33 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -54,4 +55,39 @@ public class SubtaskStateStatsTest {
                // Trigger timestamp < ack timestamp
                assertEquals(0, stats.getEndToEndDuration(ackTimestamp + 1));
        }
+
+       /**
+        * Tests that the snapshot is actually serializable.
+        */
+       @Test
+       public void testIsJavaSerializable() throws Exception {
+               SubtaskStateStats stats = new SubtaskStateStats(
+                       0,
+                       Integer.MAX_VALUE + 1L,
+                       Integer.MAX_VALUE + 2L,
+                       Integer.MAX_VALUE + 3L,
+                       Integer.MAX_VALUE + 4L,
+                       Integer.MAX_VALUE + 5L,
+                       Integer.MAX_VALUE + 6L);
+
+               SubtaskStateStats copy = 
CommonTestUtils.createCopySerializable(stats);
+
+               assertEquals(0, copy.getSubtaskIndex());
+               assertEquals(Integer.MAX_VALUE + 1L, copy.getAckTimestamp());
+               assertEquals(Integer.MAX_VALUE + 2L, copy.getStateSize());
+               assertEquals(Integer.MAX_VALUE + 3L, 
copy.getSyncCheckpointDuration());
+               assertEquals(Integer.MAX_VALUE + 4L, 
copy.getAsyncCheckpointDuration());
+               assertEquals(Integer.MAX_VALUE + 5L, 
copy.getAlignmentBuffered());
+               assertEquals(Integer.MAX_VALUE + 6L, 
copy.getAlignmentDuration());
+
+               // Check duration helper
+               long ackTimestamp = copy.getAckTimestamp();
+               long triggerTimestamp = ackTimestamp - 10123;
+               assertEquals(10123, copy.getEndToEndDuration(triggerTimestamp));
+
+               // Trigger timestamp < ack timestamp
+               assertEquals(0, copy.getEndToEndDuration(ackTimestamp + 1));
+
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java
index 94edd9e..dd46fe2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.junit.Test;
 
@@ -31,6 +32,8 @@ import static org.junit.Assert.assertTrue;
 
 public class TaskStateStatsTest {
 
+       private final ThreadLocalRandom rand = ThreadLocalRandom.current();
+
        /**
         * Tests that subtask stats are correctly collected.
         */
@@ -48,8 +51,6 @@ public class TaskStateStatsTest {
                assertEquals(-1, taskStats.getLatestAckTimestamp());
                assertArrayEquals(subtasks, taskStats.getSubtaskStats());
 
-               ThreadLocalRandom rand = ThreadLocalRandom.current();
-
                long stateSize = 0;
                long alignmentBuffered = 0;
 
@@ -90,4 +91,45 @@ public class TaskStateStatsTest {
                assertEquals(subtasks.length, 
summary.getAlignmentBufferedStats().getCount());
                assertEquals(subtasks.length, 
summary.getAlignmentDurationStats().getCount());
        }
+
+       @Test
+       public void testIsJavaSerializable() throws Exception {
+               JobVertexID jobVertexId = new JobVertexID();
+               SubtaskStateStats[] subtasks = new SubtaskStateStats[7];
+
+               TaskStateStats taskStats = new TaskStateStats(jobVertexId, 
subtasks.length);
+
+               long stateSize = 0;
+               long alignmentBuffered = 0;
+
+               for (int i = 0; i < subtasks.length; i++) {
+                       subtasks[i] = new SubtaskStateStats(
+                               i,
+                               rand.nextInt(128),
+                               rand.nextInt(128),
+                               rand.nextInt(128),
+                               rand.nextInt(128),
+                               rand.nextInt(128),
+                               rand.nextInt(128));
+
+                       stateSize += subtasks[i].getStateSize();
+                       alignmentBuffered += subtasks[i].getAlignmentBuffered();
+
+                       taskStats.reportSubtaskStats(subtasks[i]);
+               }
+
+               TaskStateStats copy = 
CommonTestUtils.createCopySerializable(taskStats);
+
+               assertEquals(stateSize, copy.getStateSize());
+               assertEquals(alignmentBuffered, copy.getAlignmentBuffered());
+
+               TaskStateStats.TaskStateStatsSummary summary = 
copy.getSummaryStats();
+               assertEquals(subtasks.length, 
summary.getStateSizeStats().getCount());
+               assertEquals(subtasks.length, 
summary.getAckTimestampStats().getCount());
+               assertEquals(subtasks.length, 
summary.getSyncCheckpointDurationStats().getCount());
+               assertEquals(subtasks.length, 
summary.getAsyncCheckpointDurationStats().getCount());
+               assertEquals(subtasks.length, 
summary.getAlignmentBufferedStats().getCount());
+               assertEquals(subtasks.length, 
summary.getAlignmentDurationStats().getCount());
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 6d7427a..9b1064d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -172,11 +172,8 @@ public class ArchivedExecutionGraphTest {
                // 
-------------------------------------------------------------------------------------------------------------
                // CheckpointStats
                // 
-------------------------------------------------------------------------------------------------------------
-               CheckpointStatsTracker runtimeStats = 
runtimeGraph.getCheckpointStatsTracker();
-               CheckpointStatsTracker archivedStats = 
archivedGraph.getCheckpointStatsTracker();
-
-               CheckpointStatsSnapshot runtimeSnapshot = 
runtimeStats.createSnapshot();
-               CheckpointStatsSnapshot archivedSnapshot = 
archivedStats.createSnapshot();
+               CheckpointStatsSnapshot runtimeSnapshot = 
runtimeGraph.getCheckpointStatsSnapshot();
+               CheckpointStatsSnapshot archivedSnapshot = 
archivedGraph.getCheckpointStatsSnapshot();
 
                
assertEquals(runtimeSnapshot.getSummaryStats().getEndToEndDurationStats().getAverage(),
 archivedSnapshot.getSummaryStats().getEndToEndDurationStats().getAverage());
                
assertEquals(runtimeSnapshot.getSummaryStats().getEndToEndDurationStats().getMinimum(),
 archivedSnapshot.getSummaryStats().getEndToEndDurationStats().getMinimum());

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
new file mode 100644
index 0000000..667dbca
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class JobSnapshottingSettingsTest {
+
+       /**
+        * Tests that the settings are actually serializable.
+        */
+       @Test
+       public void testIsJavaSerializable() throws Exception {
+               JobSnapshottingSettings settings = new JobSnapshottingSettings(
+                       Arrays.asList(new JobVertexID(), new JobVertexID()),
+                       Arrays.asList(new JobVertexID(), new JobVertexID()),
+                       Arrays.asList(new JobVertexID(), new JobVertexID()),
+                       1231231,
+                       1231,
+                       112,
+                       12,
+                       
ExternalizedCheckpointSettings.externalizeCheckpoints(true),
+                       false);
+
+               JobSnapshottingSettings copy = 
CommonTestUtils.createCopySerializable(settings);
+               assertEquals(settings.getVerticesToAcknowledge(), 
copy.getVerticesToAcknowledge());
+               assertEquals(settings.getVerticesToConfirm(), 
copy.getVerticesToConfirm());
+               assertEquals(settings.getVerticesToTrigger(), 
copy.getVerticesToTrigger());
+               assertEquals(settings.getCheckpointInterval(), 
copy.getCheckpointInterval());
+               assertEquals(settings.getCheckpointTimeout(), 
copy.getCheckpointTimeout());
+               assertEquals(settings.getMinPauseBetweenCheckpoints(), 
copy.getMinPauseBetweenCheckpoints());
+               assertEquals(settings.getMaxConcurrentCheckpoints(), 
copy.getMaxConcurrentCheckpoints());
+               
assertEquals(settings.getExternalizedCheckpointSettings().externalizeCheckpoints(),
 copy.getExternalizedCheckpointSettings().externalizeCheckpoints());
+               
assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(),
 copy.getExternalizedCheckpointSettings().deleteOnCancellation());
+               assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce());
+       }
+}

Reply via email to