Repository: flink
Updated Branches:
  refs/heads/master c607b9a22 -> 42328bd9b


[hotfix] [checkpoints] Rename JobSnapshottingSettings to 
JobCheckpointingSettings

Cleanup to consistently use
  - checkpoint for the overall fault tolerance mechanism and procedure
  - snapshot for an operators state snapshot that is part of a checkpoint


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f0d6769
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f0d6769
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f0d6769

Branch: refs/heads/master
Commit: 5f0d6769051be8db8ee12659e13d22e5c5fd2f2d
Parents: 4d8627c
Author: Stephan Ewen <[email protected]>
Authored: Mon Mar 27 18:23:13 2017 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Thu Apr 20 10:59:50 2017 +0200

----------------------------------------------------------------------
 .../jobmanager/JMXJobManagerMetricTest.java     |   4 +-
 .../checkpoints/CheckpointConfigHandler.java    |   4 +-
 .../CheckpointConfigHandlerTest.java            |  14 +-
 .../checkpoint/CheckpointStatsTracker.java      |  14 +-
 .../executiongraph/AccessExecutionGraph.java    |   6 +-
 .../executiongraph/ArchivedExecutionGraph.java  |  13 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   7 +-
 .../executiongraph/ExecutionGraphBuilder.java   |   4 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  12 +-
 .../tasks/JobCheckpointingSettings.java         | 151 +++++++++++++++++++
 .../jobgraph/tasks/JobSnapshottingSettings.java | 151 -------------------
 .../checkpoint/CheckpointStatsTrackerTest.java  |  16 +-
 .../checkpoint/CoordinatorShutdownTest.java     |   6 +-
 .../ArchivedExecutionGraphTest.java             |   4 +-
 .../ExecutionGraphDeploymentTest.java           |   4 +-
 .../tasks/JobCheckpointingSettingsTest.java     |  65 ++++++++
 .../tasks/JobSnapshottingSettingsTest.java      |  65 --------
 .../jobmanager/JobManagerHARecoveryTest.java    |   4 +-
 .../runtime/jobmanager/JobManagerTest.java      |  12 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |   4 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |   8 +-
 .../api/graph/StreamingJobGraphGenerator.java   |   4 +-
 .../graph/StreamingJobGraphGeneratorTest.java   |   4 +-
 23 files changed, 287 insertions(+), 289 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 1fdac65..934a621 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.junit.Assert;
@@ -70,7 +70,7 @@ public class JMXJobManagerMetricTest {
                        
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
 
                        JobGraph jobGraph = new JobGraph("TestingJob", 
sourceJobVertex);
-                       jobGraph.setSnapshotSettings(new 
JobSnapshottingSettings(
+                       jobGraph.setSnapshotSettings(new 
JobCheckpointingSettings(
                                Collections.<JobVertexID>emptyList(),
                                Collections.<JobVertexID>emptyList(),
                                Collections.<JobVertexID>emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index 947b7c3..7914c29 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -21,7 +21,7 @@ package 
org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import 
org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
@@ -69,7 +69,7 @@ public class CheckpointConfigHandler extends 
AbstractExecutionGraphRequestHandle
        private static String createCheckpointConfigJson(AccessExecutionGraph 
graph) throws IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
-               JobSnapshottingSettings settings = 
graph.getJobSnapshottingSettings();
+               JobCheckpointingSettings settings = 
graph.getJobCheckpointingSettings();
 
                if (settings == null) {
                        return "{}";

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
index 9d339f5..6e48973 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
@@ -50,7 +50,7 @@ public class CheckpointConfigHandlerTest {
 
                AccessExecutionGraph graph = graphAndSettings.graph;
                when(graph.getJobID()).thenReturn(new JobID());
-               JobSnapshottingSettings settings = 
graphAndSettings.snapshottingSettings;
+               JobCheckpointingSettings settings = 
graphAndSettings.snapshottingSettings;
                ExternalizedCheckpointSettings externalizedSettings = 
graphAndSettings.externalizedSettings;
                
                Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(graph);
@@ -90,7 +90,7 @@ public class CheckpointConfigHandlerTest {
                GraphAndSettings graphAndSettings = 
createGraphAndSettings(false, true);
 
                AccessExecutionGraph graph = graphAndSettings.graph;
-               JobSnapshottingSettings settings = 
graphAndSettings.snapshottingSettings;
+               JobCheckpointingSettings settings = 
graphAndSettings.snapshottingSettings;
 
                CheckpointConfigHandler handler = new 
CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
                String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap());
@@ -156,7 +156,7 @@ public class CheckpointConfigHandlerTest {
                        ? 
ExternalizedCheckpointSettings.externalizeCheckpoints(true)
                        : ExternalizedCheckpointSettings.none();
 
-               JobSnapshottingSettings settings = new JobSnapshottingSettings(
+               JobCheckpointingSettings settings = new 
JobCheckpointingSettings(
                        Collections.<JobVertexID>emptyList(),
                        Collections.<JobVertexID>emptyList(),
                        Collections.<JobVertexID>emptyList(),
@@ -169,19 +169,19 @@ public class CheckpointConfigHandlerTest {
                        exactlyOnce);
 
                AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               when(graph.getJobSnapshottingSettings()).thenReturn(settings);
+               when(graph.getJobCheckpointingSettings()).thenReturn(settings);
 
                return new GraphAndSettings(graph, settings, 
externalizedSetting);
        }
 
        private static class GraphAndSettings {
                public final AccessExecutionGraph graph;
-               public final JobSnapshottingSettings snapshottingSettings;
+               public final JobCheckpointingSettings snapshottingSettings;
                public final ExternalizedCheckpointSettings 
externalizedSettings;
 
                public GraphAndSettings(
                                AccessExecutionGraph graph,
-                               JobSnapshottingSettings snapshottingSettings,
+                               JobCheckpointingSettings snapshottingSettings,
                                ExternalizedCheckpointSettings 
externalizedSettings) {
                        this.graph = graph;
                        this.snapshottingSettings = snapshottingSettings;

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index c7efb7b..313fe13 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -31,7 +31,7 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 
 /**
  * Tracker for checkpoint statistics.
@@ -67,7 +67,7 @@ public class CheckpointStatsTracker {
        private final int totalSubtaskCount;
 
        /** Snapshotting settings created from the CheckpointConfig. */
-       private final JobSnapshottingSettings jobSnapshottingSettings;
+       private final JobCheckpointingSettings jobCheckpointingSettings;
 
        /** Checkpoint counts. */
        private final CheckpointStatsCounts counts = new 
CheckpointStatsCounts();
@@ -104,19 +104,19 @@ public class CheckpointStatsTracker {
         *
         * @param numRememberedCheckpoints Maximum number of checkpoints to 
remember, including in progress ones.
         * @param jobVertices Job vertices involved in the checkpoints.
-        * @param jobSnapshottingSettings Snapshotting settings created from 
the CheckpointConfig.
+        * @param jobCheckpointingSettings Snapshotting settings created from 
the CheckpointConfig.
         * @param metricGroup Metric group for exposed metrics
         */
        public CheckpointStatsTracker(
                int numRememberedCheckpoints,
                List<ExecutionJobVertex> jobVertices,
-               JobSnapshottingSettings jobSnapshottingSettings,
+               JobCheckpointingSettings jobCheckpointingSettings,
                MetricGroup metricGroup) {
 
                checkArgument(numRememberedCheckpoints >= 0, "Negative number 
of remembered checkpoints");
                this.history = new 
CheckpointStatsHistory(numRememberedCheckpoints);
                this.jobVertices = checkNotNull(jobVertices, "JobVertices");
-               this.jobSnapshottingSettings = 
checkNotNull(jobSnapshottingSettings);
+               this.jobCheckpointingSettings = 
checkNotNull(jobCheckpointingSettings);
 
                // Compute the total subtask count. We do this here in order to 
only
                // do it once.
@@ -143,8 +143,8 @@ public class CheckpointStatsTracker {
         *
         * @return The job's snapshotting settings.
         */
-       public JobSnapshottingSettings getSnapshottingSettings() {
-               return jobSnapshottingSettings;
+       public JobCheckpointingSettings getSnapshottingSettings() {
+               return jobCheckpointingSettings;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 18c2ec2..3b064c3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
@@ -116,12 +116,12 @@ public interface AccessExecutionGraph {
        CheckpointCoordinator getCheckpointCoordinator();
 
        /**
-        * Returns the {@link JobSnapshottingSettings} or <code>null</code> if
+        * Returns the {@link JobCheckpointingSettings} or <code>null</code> if
         * checkpointing is disabled.
         *
         * @return JobSnapshottingSettings for this execution graph
         */
-       JobSnapshottingSettings getJobSnapshottingSettings();
+       JobCheckpointingSettings getJobCheckpointingSettings();
 
        /**
         * Returns a snapshot of the checkpoint statistics or <code>null</code> 
if

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 334b0d0..b9db1e7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nullable;
@@ -81,7 +81,7 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
        private final Map<String, SerializedValue<Object>> 
serializedUserAccumulators;
 
        @Nullable
-       private final JobSnapshottingSettings jobSnapshottingSettings;
+       private final JobCheckpointingSettings jobCheckpointingSettings;
 
        @Nullable
        private final CheckpointStatsSnapshot checkpointStatsSnapshot;
@@ -99,7 +99,7 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
                        Map<String, SerializedValue<Object>> 
serializedUserAccumulators,
                        ArchivedExecutionConfig executionConfig,
                        boolean isStoppable,
-                       @Nullable JobSnapshottingSettings 
jobSnapshottingSettings,
+                       @Nullable JobCheckpointingSettings 
jobCheckpointingSettings,
                        @Nullable CheckpointStatsSnapshot 
checkpointStatsSnapshot) {
 
                this.jobID = jobID;
@@ -114,7 +114,7 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
                this.serializedUserAccumulators = serializedUserAccumulators;
                this.archivedExecutionConfig = executionConfig;
                this.isStoppable = isStoppable;
-               this.jobSnapshottingSettings = jobSnapshottingSettings;
+               this.jobCheckpointingSettings = jobCheckpointingSettings;
                this.checkpointStatsSnapshot = checkpointStatsSnapshot;
        }
 
@@ -210,9 +210,8 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
                return null;
        }
 
-       @Override
-       public JobSnapshottingSettings getJobSnapshottingSettings() {
-               return jobSnapshottingSettings;
+       public JobCheckpointingSettings getJobCheckpointingSettings() {
+               return jobCheckpointingSettings;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index b21b72b..29b9806 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -55,7 +55,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
@@ -419,8 +419,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                return restartStrategy;
        }
 
-       @Override
-       public JobSnapshottingSettings getJobSnapshottingSettings() {
+       public JobCheckpointingSettings getJobCheckpointingSettings() {
                if (checkpointStatsTracker != null) {
                        return checkpointStatsTracker.getSnapshottingSettings();
                } else {
@@ -1477,7 +1476,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        serializedUserAccumulators,
                        getArchivedExecutionConfig(),
                        isStoppable(),
-                       getJobSnapshottingSettings(),
+                       getJobCheckpointingSettings(),
                        getCheckpointStatsSnapshot());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index a6455f5..a10c62e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.util.DynamicCodeLoadingException;
@@ -159,7 +159,7 @@ public class ExecutionGraphBuilder {
                }
 
                // configure the state checkpointing
-               JobSnapshottingSettings snapshotSettings = 
jobGraph.getSnapshotSettings();
+               JobCheckpointingSettings snapshotSettings = 
jobGraph.getCheckpointingSettings();
                if (snapshotSettings != null) {
                        List<ExecutionJobVertex> triggerVertices = 
                                        
idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index f6377e5..2a8af37 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -28,7 +28,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.util.SerializedValue;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -92,7 +92,7 @@ public class JobGraph implements Serializable {
        private SerializedValue<ExecutionConfig> serializedExecutionConfig;
 
        /** The settings for the job checkpoints */
-       private JobSnapshottingSettings snapshotSettings;
+       private JobCheckpointingSettings snapshotSettings;
 
        /** Savepoint restore settings. */
        private SavepointRestoreSettings savepointRestoreSettings = 
SavepointRestoreSettings.none();
@@ -331,17 +331,17 @@ public class JobGraph implements Serializable {
         *
         * @param settings The snapshot settings, or null, to disable 
snapshotting.
         */
-       public void setSnapshotSettings(JobSnapshottingSettings settings) {
+       public void setSnapshotSettings(JobCheckpointingSettings settings) {
                this.snapshotSettings = settings;
        }
 
        /**
         * Gets the settings for asynchronous snapshots. This method returns 
null, when
-        * snapshotting is not enabled.
+        * checkpointing is not enabled.
         *
-        * @return The snapshot settings, or null, if snapshotting is not 
enabled.
+        * @return The snapshot settings, or null, if checkpointing is not 
enabled.
         */
-       public JobSnapshottingSettings getSnapshotSettings() {
+       public JobCheckpointingSettings getCheckpointingSettings() {
                return snapshotSettings;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
new file mode 100644
index 0000000..38130d4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.StateBackend;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * The JobCheckpointingSettings are attached to a JobGraph and describe the 
settings
+ * for the asynchronous checkpoints of the JobGraph, such as interval, and 
which vertices
+ * need to participate.
+ */
+public class JobCheckpointingSettings implements java.io.Serializable {
+       
+       private static final long serialVersionUID = -2593319571078198180L;
+       
+       private final List<JobVertexID> verticesToTrigger;
+
+       private final List<JobVertexID> verticesToAcknowledge;
+
+       private final List<JobVertexID> verticesToConfirm;
+       
+       private final long checkpointInterval;
+       
+       private final long checkpointTimeout;
+       
+       private final long minPauseBetweenCheckpoints;
+       
+       private final int maxConcurrentCheckpoints;
+
+       /** Settings for externalized checkpoints. */
+       private final ExternalizedCheckpointSettings 
externalizedCheckpointSettings;
+
+       /** The default state backend, if configured by the user in the job */
+       @Nullable
+       private final StateBackend defaultStateBackend;
+
+       /**
+        * Flag indicating whether exactly once checkpoint mode has been 
configured.
+        * If <code>false</code>, at least once mode has been configured. This 
is
+        * not a necessary attribute, because the checkpointing mode is only 
relevant
+        * for the stream tasks, but we expose it here to forward it to the web 
runtime
+        * UI.
+        */
+       private final boolean isExactlyOnce;
+
+       public JobCheckpointingSettings(
+                       List<JobVertexID> verticesToTrigger,
+                       List<JobVertexID> verticesToAcknowledge,
+                       List<JobVertexID> verticesToConfirm,
+                       long checkpointInterval,
+                       long checkpointTimeout,
+                       long minPauseBetweenCheckpoints,
+                       int maxConcurrentCheckpoints,
+                       ExternalizedCheckpointSettings 
externalizedCheckpointSettings,
+                       @Nullable StateBackend defaultStateBackend,
+                       boolean isExactlyOnce) {
+
+               // sanity checks
+               if (checkpointInterval < 1 || checkpointTimeout < 1 ||
+                               minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1) {
+                       throw new IllegalArgumentException();
+               }
+               
+               this.verticesToTrigger = requireNonNull(verticesToTrigger);
+               this.verticesToAcknowledge = 
requireNonNull(verticesToAcknowledge);
+               this.verticesToConfirm = requireNonNull(verticesToConfirm);
+               this.checkpointInterval = checkpointInterval;
+               this.checkpointTimeout = checkpointTimeout;
+               this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+               this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+               this.externalizedCheckpointSettings = 
requireNonNull(externalizedCheckpointSettings);
+               this.defaultStateBackend = defaultStateBackend;
+               this.isExactlyOnce = isExactlyOnce;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+
+       public List<JobVertexID> getVerticesToTrigger() {
+               return verticesToTrigger;
+       }
+       
+       public List<JobVertexID> getVerticesToAcknowledge() {
+               return verticesToAcknowledge;
+       }
+
+       public List<JobVertexID> getVerticesToConfirm() {
+               return verticesToConfirm;
+       }
+
+       public long getCheckpointInterval() {
+               return checkpointInterval;
+       }
+
+       public long getCheckpointTimeout() {
+               return checkpointTimeout;
+       }
+
+       public long getMinPauseBetweenCheckpoints() {
+               return minPauseBetweenCheckpoints;
+       }
+
+       public int getMaxConcurrentCheckpoints() {
+               return maxConcurrentCheckpoints;
+       }
+
+       public ExternalizedCheckpointSettings 
getExternalizedCheckpointSettings() {
+               return externalizedCheckpointSettings;
+       }
+
+       @Nullable
+       public StateBackend getDefaultStateBackend() {
+               return defaultStateBackend;
+       }
+
+       public boolean isExactlyOnce() {
+               return isExactlyOnce;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public String toString() {
+               return String.format("SnapshotSettings: interval=%d, 
timeout=%d, pause-between=%d, " +
+                                               "maxConcurrent=%d, trigger=%s, 
ack=%s, commit=%s",
+                                               checkpointInterval, 
checkpointTimeout,
+                                               minPauseBetweenCheckpoints, 
maxConcurrentCheckpoints,
+                                               verticesToTrigger, 
verticesToAcknowledge, verticesToConfirm);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
deleted file mode 100644
index 233aa88..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.StateBackend;
-
-import javax.annotation.Nullable;
-import java.util.List;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * The JobCheckpointingSettings are attached to a JobGraph and describe the 
settings
- * for the asynchronous checkpoints of the JobGraph, such as interval, and 
which vertices
- * need to participate.
- */
-public class JobSnapshottingSettings implements java.io.Serializable {
-       
-       private static final long serialVersionUID = -2593319571078198180L;
-       
-       private final List<JobVertexID> verticesToTrigger;
-
-       private final List<JobVertexID> verticesToAcknowledge;
-
-       private final List<JobVertexID> verticesToConfirm;
-       
-       private final long checkpointInterval;
-       
-       private final long checkpointTimeout;
-       
-       private final long minPauseBetweenCheckpoints;
-       
-       private final int maxConcurrentCheckpoints;
-
-       /** Settings for externalized checkpoints. */
-       private final ExternalizedCheckpointSettings 
externalizedCheckpointSettings;
-
-       /** The default state backend, if configured by the user in the job */
-       @Nullable
-       private final StateBackend defaultStateBackend;
-
-       /**
-        * Flag indicating whether exactly once checkpoint mode has been 
configured.
-        * If <code>false</code>, at least once mode has been configured. This 
is
-        * not a necessary attribute, because the checkpointing mode is only 
relevant
-        * for the stream tasks, but we expose it here to forward it to the web 
runtime
-        * UI.
-        */
-       private final boolean isExactlyOnce;
-
-       public JobSnapshottingSettings(
-                       List<JobVertexID> verticesToTrigger,
-                       List<JobVertexID> verticesToAcknowledge,
-                       List<JobVertexID> verticesToConfirm,
-                       long checkpointInterval,
-                       long checkpointTimeout,
-                       long minPauseBetweenCheckpoints,
-                       int maxConcurrentCheckpoints,
-                       ExternalizedCheckpointSettings 
externalizedCheckpointSettings,
-                       @Nullable StateBackend defaultStateBackend,
-                       boolean isExactlyOnce) {
-
-               // sanity checks
-               if (checkpointInterval < 1 || checkpointTimeout < 1 ||
-                               minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1) {
-                       throw new IllegalArgumentException();
-               }
-               
-               this.verticesToTrigger = requireNonNull(verticesToTrigger);
-               this.verticesToAcknowledge = 
requireNonNull(verticesToAcknowledge);
-               this.verticesToConfirm = requireNonNull(verticesToConfirm);
-               this.checkpointInterval = checkpointInterval;
-               this.checkpointTimeout = checkpointTimeout;
-               this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
-               this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
-               this.externalizedCheckpointSettings = 
requireNonNull(externalizedCheckpointSettings);
-               this.defaultStateBackend = defaultStateBackend;
-               this.isExactlyOnce = isExactlyOnce;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-
-       public List<JobVertexID> getVerticesToTrigger() {
-               return verticesToTrigger;
-       }
-       
-       public List<JobVertexID> getVerticesToAcknowledge() {
-               return verticesToAcknowledge;
-       }
-
-       public List<JobVertexID> getVerticesToConfirm() {
-               return verticesToConfirm;
-       }
-
-       public long getCheckpointInterval() {
-               return checkpointInterval;
-       }
-
-       public long getCheckpointTimeout() {
-               return checkpointTimeout;
-       }
-
-       public long getMinPauseBetweenCheckpoints() {
-               return minPauseBetweenCheckpoints;
-       }
-
-       public int getMaxConcurrentCheckpoints() {
-               return maxConcurrentCheckpoints;
-       }
-
-       public ExternalizedCheckpointSettings 
getExternalizedCheckpointSettings() {
-               return externalizedCheckpointSettings;
-       }
-
-       @Nullable
-       public StateBackend getDefaultStateBackend() {
-               return defaultStateBackend;
-       }
-
-       public boolean isExactlyOnce() {
-               return isExactlyOnce;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public String toString() {
-               return String.format("SnapshotSettings: interval=%d, 
timeout=%d, pause-between=%d, " +
-                                               "maxConcurrent=%d, trigger=%s, 
ack=%s, commit=%s",
-                                               checkpointInterval, 
checkpointTimeout,
-                                               minPauseBetweenCheckpoints, 
maxConcurrentCheckpoints,
-                                               verticesToTrigger, 
verticesToAcknowledge, verticesToConfirm);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index aaf1774..d66d0be 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -44,7 +44,7 @@ import 
org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.junit.Test;
 
 public class CheckpointStatsTrackerTest {
@@ -58,7 +58,7 @@ public class CheckpointStatsTrackerTest {
                when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID());
                when(jobVertex.getParallelism()).thenReturn(1);
 
-               JobSnapshottingSettings snapshottingSettings = new 
JobSnapshottingSettings(
+               JobCheckpointingSettings snapshottingSettings = new 
JobCheckpointingSettings(
                        Collections.singletonList(new JobVertexID()),
                        Collections.singletonList(new JobVertexID()),
                        Collections.singletonList(new JobVertexID()),
@@ -94,7 +94,7 @@ public class CheckpointStatsTrackerTest {
                CheckpointStatsTracker tracker = new CheckpointStatsTracker(
                        0,
                        Collections.singletonList(jobVertex),
-                       mock(JobSnapshottingSettings.class),
+                       mock(JobCheckpointingSettings.class),
                        new UnregisteredMetricsGroup());
 
                PendingCheckpointStats pending = 
tracker.reportPendingCheckpoint(
@@ -142,7 +142,7 @@ public class CheckpointStatsTrackerTest {
                CheckpointStatsTracker tracker = new CheckpointStatsTracker(
                        10,
                        Collections.singletonList(jobVertex),
-                       mock(JobSnapshottingSettings.class),
+                       mock(JobCheckpointingSettings.class),
                        new UnregisteredMetricsGroup());
 
                // Completed checkpoint
@@ -247,7 +247,7 @@ public class CheckpointStatsTrackerTest {
                CheckpointStatsTracker tracker = new CheckpointStatsTracker(
                        10,
                        Collections.singletonList(jobVertex),
-                       mock(JobSnapshottingSettings.class),
+                       mock(JobCheckpointingSettings.class),
                        new UnregisteredMetricsGroup());
 
                CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot();
@@ -293,7 +293,7 @@ public class CheckpointStatsTrackerTest {
                new CheckpointStatsTracker(
                        0,
                        Collections.singletonList(jobVertex),
-                       mock(JobSnapshottingSettings.class),
+                       mock(JobCheckpointingSettings.class),
                        metricGroup);
 
                verify(metricGroup, 
times(1)).gauge(eq(CheckpointStatsTracker.NUMBER_OF_CHECKPOINTS_METRIC), 
any(Gauge.class));
@@ -409,7 +409,7 @@ public class CheckpointStatsTrackerTest {
                CheckpointStatsTracker stats = new CheckpointStatsTracker(
                        0,
                        Collections.singletonList(jobVertex),
-                       mock(JobSnapshottingSettings.class),
+                       mock(JobCheckpointingSettings.class),
                        metricGroup);
 
                // Make sure to adjust this test if metrics are added/removed
@@ -522,7 +522,7 @@ public class CheckpointStatsTrackerTest {
                return new CheckpointStatsTracker(
                        0,
                        Collections.singletonList(jobVertex),
-                       mock(JobSnapshottingSettings.class),
+                       mock(JobCheckpointingSettings.class),
                        new UnregisteredMetricsGroup());
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 976da48..2e0bd76 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -65,7 +65,7 @@ public class CoordinatorShutdownTest {
                        List<JobVertexID> vertexIdList = 
Collections.singletonList(vertex.getID());
                        
                        JobGraph testGraph = new JobGraph("test job", vertex);
-                       testGraph.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 
+                       testGraph.setSnapshotSettings(new 
JobCheckpointingSettings(vertexIdList, vertexIdList, vertexIdList, 
                                        5000, 60000, 0L, Integer.MAX_VALUE, 
ExternalizedCheckpointSettings.none(), null, true));
                        
                        ActorGateway jmGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
@@ -124,7 +124,7 @@ public class CoordinatorShutdownTest {
                        List<JobVertexID> vertexIdList = 
Collections.singletonList(vertex.getID());
 
                        JobGraph testGraph = new JobGraph("test job", vertex);
-                       testGraph.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
+                       testGraph.setSnapshotSettings(new 
JobCheckpointingSettings(vertexIdList, vertexIdList, vertexIdList,
                                        5000, 60000, 0L, Integer.MAX_VALUE, 
ExternalizedCheckpointSettings.none(), null, true));
                        
                        ActorGateway jmGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 077ab53..f96b624 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -41,7 +41,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.junit.BeforeClass;
@@ -116,7 +116,7 @@ public class ArchivedExecutionGraphTest {
                CheckpointStatsTracker statsTracker = new 
CheckpointStatsTracker(
                                0,
                                jobVertices,
-                               mock(JobSnapshottingSettings.class),
+                               mock(JobCheckpointingSettings.class),
                                new UnregisteredMetricsGroup());
 
                runtimeGraph.enableCheckpointing(

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 8d91b84..866f55c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -61,7 +61,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.operators.BatchTask;
@@ -559,7 +559,7 @@ public class ExecutionGraphDeploymentTest {
 
                final JobID jobId = new JobID();
                final JobGraph jobGraph = new JobGraph(jobId, "test");
-               jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
+               jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
                        Collections.<JobVertexID>emptyList(),
                        Collections.<JobVertexID>emptyList(),
                        Collections.<JobVertexID>emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
new file mode 100644
index 0000000..c3524fa
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class JobCheckpointingSettingsTest {
+
+       /**
+        * Tests that the settings are actually serializable.
+        */
+       @Test
+       public void testIsJavaSerializable() throws Exception {
+               JobCheckpointingSettings settings = new 
JobCheckpointingSettings(
+                       Arrays.asList(new JobVertexID(), new JobVertexID()),
+                       Arrays.asList(new JobVertexID(), new JobVertexID()),
+                       Arrays.asList(new JobVertexID(), new JobVertexID()),
+                       1231231,
+                       1231,
+                       112,
+                       12,
+                       
ExternalizedCheckpointSettings.externalizeCheckpoints(true),
+                       new MemoryStateBackend(),
+                       false);
+
+               JobCheckpointingSettings copy = 
CommonTestUtils.createCopySerializable(settings);
+               assertEquals(settings.getVerticesToAcknowledge(), 
copy.getVerticesToAcknowledge());
+               assertEquals(settings.getVerticesToConfirm(), 
copy.getVerticesToConfirm());
+               assertEquals(settings.getVerticesToTrigger(), 
copy.getVerticesToTrigger());
+               assertEquals(settings.getCheckpointInterval(), 
copy.getCheckpointInterval());
+               assertEquals(settings.getCheckpointTimeout(), 
copy.getCheckpointTimeout());
+               assertEquals(settings.getMinPauseBetweenCheckpoints(), 
copy.getMinPauseBetweenCheckpoints());
+               assertEquals(settings.getMaxConcurrentCheckpoints(), 
copy.getMaxConcurrentCheckpoints());
+               
assertEquals(settings.getExternalizedCheckpointSettings().externalizeCheckpoints(),
 copy.getExternalizedCheckpointSettings().externalizeCheckpoints());
+               
assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(),
 copy.getExternalizedCheckpointSettings().deleteOnCancellation());
+               assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce());
+               assertNotNull(copy.getDefaultStateBackend());
+               assertTrue(copy.getDefaultStateBackend().getClass() == 
MemoryStateBackend.class);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
deleted file mode 100644
index 2508d5c..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.junit.Test;
-
-import java.util.Arrays;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-public class JobSnapshottingSettingsTest {
-
-       /**
-        * Tests that the settings are actually serializable.
-        */
-       @Test
-       public void testIsJavaSerializable() throws Exception {
-               JobSnapshottingSettings settings = new JobSnapshottingSettings(
-                       Arrays.asList(new JobVertexID(), new JobVertexID()),
-                       Arrays.asList(new JobVertexID(), new JobVertexID()),
-                       Arrays.asList(new JobVertexID(), new JobVertexID()),
-                       1231231,
-                       1231,
-                       112,
-                       12,
-                       
ExternalizedCheckpointSettings.externalizeCheckpoints(true),
-                       new MemoryStateBackend(),
-                       false);
-
-               JobSnapshottingSettings copy = 
CommonTestUtils.createCopySerializable(settings);
-               assertEquals(settings.getVerticesToAcknowledge(), 
copy.getVerticesToAcknowledge());
-               assertEquals(settings.getVerticesToConfirm(), 
copy.getVerticesToConfirm());
-               assertEquals(settings.getVerticesToTrigger(), 
copy.getVerticesToTrigger());
-               assertEquals(settings.getCheckpointInterval(), 
copy.getCheckpointInterval());
-               assertEquals(settings.getCheckpointTimeout(), 
copy.getCheckpointTimeout());
-               assertEquals(settings.getMinPauseBetweenCheckpoints(), 
copy.getMinPauseBetweenCheckpoints());
-               assertEquals(settings.getMaxConcurrentCheckpoints(), 
copy.getMaxConcurrentCheckpoints());
-               
assertEquals(settings.getExternalizedCheckpointSettings().externalizeCheckpoints(),
 copy.getExternalizedCheckpointSettings().externalizeCheckpoints());
-               
assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(),
 copy.getExternalizedCheckpointSettings().deleteOnCancellation());
-               assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce());
-               assertNotNull(copy.getDefaultStateBackend());
-               assertTrue(copy.getDefaultStateBackend().getClass() == 
MemoryStateBackend.class);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index dcf4722..6eacaac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -61,7 +61,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
@@ -220,7 +220,7 @@ public class JobManagerHARecoveryTest {
                        JobGraph jobGraph = new JobGraph("TestingJob", 
sourceJobVertex);
 
                        List<JobVertexID> vertexId = 
Collections.singletonList(sourceJobVertex.getID());
-                       jobGraph.setSnapshotSettings(new 
JobSnapshottingSettings(
+                       jobGraph.setSnapshotSettings(new 
JobCheckpointingSettings(
                                        vertexId,
                                        vertexId,
                                        vertexId,

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 4dec84b..d7fc71d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -48,7 +48,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import 
org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
@@ -821,7 +821,7 @@ public class JobManagerTest extends TestLogger {
 
                        JobGraph jobGraph = new JobGraph("TestingJob", 
sourceVertex);
 
-                       JobSnapshottingSettings snapshottingSettings = new 
JobSnapshottingSettings(
+                       JobCheckpointingSettings snapshottingSettings = new 
JobCheckpointingSettings(
                                        
Collections.singletonList(sourceVertex.getID()),
                                        
Collections.singletonList(sourceVertex.getID()),
                                        
Collections.singletonList(sourceVertex.getID()),
@@ -947,7 +947,7 @@ public class JobManagerTest extends TestLogger {
 
                        JobGraph jobGraph = new JobGraph("TestingJob", 
sourceVertex);
 
-                       JobSnapshottingSettings snapshottingSettings = new 
JobSnapshottingSettings(
+                       JobCheckpointingSettings snapshottingSettings = new 
JobCheckpointingSettings(
                                Collections.singletonList(sourceVertex.getID()),
                                Collections.singletonList(sourceVertex.getID()),
                                Collections.singletonList(sourceVertex.getID()),
@@ -1053,7 +1053,7 @@ public class JobManagerTest extends TestLogger {
 
                        JobGraph jobGraph = new JobGraph("TestingJob", 
sourceVertex);
 
-                       JobSnapshottingSettings snapshottingSettings = new 
JobSnapshottingSettings(
+                       JobCheckpointingSettings snapshottingSettings = new 
JobCheckpointingSettings(
                                        
Collections.singletonList(sourceVertex.getID()),
                                        
Collections.singletonList(sourceVertex.getID()),
                                        
Collections.singletonList(sourceVertex.getID()),
@@ -1156,7 +1156,7 @@ public class JobManagerTest extends TestLogger {
 
                        JobGraph jobGraph = new JobGraph("TestingJob", 
sourceVertex);
 
-                       JobSnapshottingSettings snapshottingSettings = new 
JobSnapshottingSettings(
+                       JobCheckpointingSettings snapshottingSettings = new 
JobCheckpointingSettings(
                                        
Collections.singletonList(sourceVertex.getID()),
                                        
Collections.singletonList(sourceVertex.getID()),
                                        
Collections.singletonList(sourceVertex.getID()),
@@ -1203,7 +1203,7 @@ public class JobManagerTest extends TestLogger {
 
                        JobGraph newJobGraph = new JobGraph("NewTestingJob", 
newSourceVertex);
 
-                       JobSnapshottingSettings newSnapshottingSettings = new 
JobSnapshottingSettings(
+                       JobCheckpointingSettings newSnapshottingSettings = new 
JobCheckpointingSettings(
                                        
Collections.singletonList(newSourceVertex.getID()),
                                        
Collections.singletonList(newSourceVertex.getID()),
                                        
Collections.singletonList(newSourceVertex.getID()),

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index ba5f973..bdff401 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -228,7 +228,7 @@ public class JobSubmitTest {
                List<JobVertexID> vertexIdList = 
Collections.singletonList(jobVertex.getID());
 
                JobGraph jg = new JobGraph("test job", jobVertex);
-               jg.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
+               jg.setSnapshotSettings(new 
JobCheckpointingSettings(vertexIdList, vertexIdList, vertexIdList,
                        5000, 5000, 0L, 10, 
ExternalizedCheckpointSettings.none(), null, true));
                return jg;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 5374d01..ce8517e 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -27,7 +27,7 @@ import 
org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, CompletedChec
 import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType
-import 
org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, 
JobSnapshottingSettings}
+import 
org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, 
JobCheckpointingSettings}
 import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, 
JobVertex, ScheduleMode}
 import org.apache.flink.runtime.jobmanager.Tasks._
 import 
org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, 
SlotSharingGroup}
@@ -827,7 +827,7 @@ class JobManagerITCase(_system: ActorSystem)
           val jobVertex = new JobVertex("Blocking vertex")
           jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
           val jobGraph = new JobGraph(jobVertex)
-          jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
+          jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
@@ -887,7 +887,7 @@ class JobManagerITCase(_system: ActorSystem)
           val jobVertex = new JobVertex("Blocking vertex")
           jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
           val jobGraph = new JobGraph(jobVertex)
-          jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
+          jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
@@ -955,7 +955,7 @@ class JobManagerITCase(_system: ActorSystem)
           val jobVertex = new JobVertex("Blocking vertex")
           jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
           val jobGraph = new JobGraph(jobVertex)
-          jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
+          jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 794de5a..7d62273 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.util.TaskConfig;
@@ -579,7 +579,7 @@ public class StreamingJobGraphGenerator {
                                "exactly-once or at-least-once.");
                }
 
-               JobSnapshottingSettings settings = new JobSnapshottingSettings(
+               JobCheckpointingSettings settings = new 
JobCheckpointingSettings(
                                triggerVertices, ackVertices, commitVertices, 
interval,
                                cfg.getCheckpointTimeout(), 
cfg.getMinPauseBetweenCheckpoints(),
                                cfg.getMaxConcurrentCheckpoints(),

http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 6d2fcaa..2c71a07 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -118,7 +118,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                StreamingJobGraphGenerator jobGraphGenerator = new 
StreamingJobGraphGenerator(streamGraph);
                JobGraph jobGraph = jobGraphGenerator.createJobGraph();
 
-               JobSnapshottingSettings snapshottingSettings = 
jobGraph.getSnapshotSettings();
+               JobCheckpointingSettings snapshottingSettings = 
jobGraph.getCheckpointingSettings();
                assertEquals(Long.MAX_VALUE, 
snapshottingSettings.getCheckpointInterval());
        }
 

Reply via email to