Repository: flink Updated Branches: refs/heads/release-1.1 16e7c7867 -> b4207503c
[FLINK-4510] [checkpoint] Always create CheckpointCoordinator Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b4207503 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b4207503 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b4207503 Branch: refs/heads/release-1.1 Commit: b4207503c2e674ce501404cbc6cb264647f17397 Parents: 16e7c78 Author: Jark Wu <[email protected]> Authored: Mon Oct 17 19:46:15 2016 +0200 Committer: Ufuk Celebi <[email protected]> Committed: Thu Oct 27 17:30:50 2016 +0200 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 2 +- .../runtime/executiongraph/ExecutionGraph.java | 46 ++++--- .../flink/runtime/jobmanager/JobManager.scala | 44 +++--- .../runtime/jobmanager/JobManagerTest.java | 138 +++++++++++++++++-- .../api/graph/StreamingJobGraphGenerator.java | 68 +++++---- .../graph/StreamingJobGraphGeneratorTest.java | 28 +++- 6 files changed, 234 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b4207503/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 6aaa014..409f05b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -411,7 +411,7 @@ public class CheckpointCoordinator { } //make sure the minimum interval between checkpoints has passed - if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) { + if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp && baseInterval != Long.MAX_VALUE) { if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(); currentPeriodicTrigger = null; http://git-wip-us.apache.org/repos/asf/flink/blob/b4207503/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 8cf8354..cc8e75d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -397,27 +397,31 @@ public class ExecutionGraph implements Serializable { checkpointStatsTracker = Objects.requireNonNull(statsTracker, "Checkpoint stats tracker"); - // create the coordinator that triggers and commits checkpoints and holds the state - checkpointCoordinator = new CheckpointCoordinator( - jobID, - interval, - checkpointTimeout, - minPauseBetweenCheckpoints, - maxConcurrentCheckpoints, - numberKeyGroups, - tasksToTrigger, - tasksToWaitFor, - tasksToCommitTo, - userClassLoader, - checkpointIDCounter, - checkpointStore, - recoveryMode, - checkpointStatsTracker); - - // the periodic checkpoint scheduler is activated and deactivated as a result of - // job status changes (running -> on, all other states -> off) - registerJobStatusListener( - checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID)); + // interval of max long value indicates disable periodic checkpoint, + // the CheckpoitnCoordinator should be created only if the interval is not max value + if (interval != Long.MAX_VALUE) { + // create the coordinator that triggers and commits checkpoints and holds the state + checkpointCoordinator = new CheckpointCoordinator( + jobID, + interval, + checkpointTimeout, + minPauseBetweenCheckpoints, + maxConcurrentCheckpoints, + numberKeyGroups, + tasksToTrigger, + tasksToWaitFor, + tasksToCommitTo, + userClassLoader, + checkpointIDCounter, + checkpointStore, + recoveryMode, + checkpointStatsTracker); + + // the periodic checkpoint scheduler is activated and deactivated as a result of + // job status changes (running -> on, all other states -> off) + registerJobStatusListener( + checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID)); + } // Savepoint Coordinator savepointCoordinator = new SavepointCoordinator( http://git-wip-us.apache.org/repos/asf/flink/blob/b4207503/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index a4c2403..f16747a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1360,22 +1360,22 @@ class JobManager( val checkpointCoordinator = graph.getCheckpointCoordinator() val savepointCoordinator = graph.getSavepointCoordinator() - if (checkpointCoordinator != null && savepointCoordinator != null) { + if (checkpointCoordinator != null || savepointCoordinator != null) { future { try { - if (checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) { + if (checkpointCoordinator != null && + checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) { // OK, this is the common case + } else if ( + savepointCoordinator != null && + !savepointCoordinator.receiveAcknowledgeMessage(ackMessage)) { + + // Tried the savepoint coordinator if the message was not + // addressed to the periodic checkpoint coordinator. + log.info("Received message for non-existing checkpoint " + + ackMessage.getCheckpointId) } - else { - // Try the savepoint coordinator if the message was not addressed - // to the periodic checkpoint coordinator. - if (!savepointCoordinator.receiveAcknowledgeMessage(ackMessage)) { - log.info("Received message for non-existing checkpoint " + - ackMessage.getCheckpointId) - } - } - } - catch { + } catch { case t: Throwable => log.error(s"Error in CheckpointCoordinator while processing $ackMessage", t) } @@ -1397,24 +1397,26 @@ class JobManager( val checkpointCoordinator = graph.getCheckpointCoordinator() val savepointCoordinator = graph.getSavepointCoordinator() - if (checkpointCoordinator != null && savepointCoordinator != null) { + if (checkpointCoordinator != null || savepointCoordinator != null) { future { try { - if (checkpointCoordinator.receiveDeclineMessage(declineMessage)) { + if (checkpointCoordinator != null && + checkpointCoordinator.receiveDeclineMessage(declineMessage)) { // OK, this is the common case } - else { - // Try the savepoint coordinator if the message was not addressed + else if ( + savepointCoordinator != null && + !savepointCoordinator.receiveDeclineMessage(declineMessage)) { + + // Tried the savepoint coordinator if the message was not addressed // to the periodic checkpoint coordinator. - if (!savepointCoordinator.receiveDeclineMessage(declineMessage)) { - log.info("Received message for non-existing checkpoint " + - declineMessage.getCheckpointId) - } + log.info("Received message for non-existing checkpoint " + + declineMessage.getCheckpointId) } } catch { case t: Throwable => - log.error(s"Error in CheckpointCoordinator while processing $declineMessage", t) + log.error(s"Error in SavepointCoordinator while processing $declineMessage", t) } }(context.dispatcher) } http://git-wip-us.apache.org/repos/asf/flink/blob/b4207503/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 5c25003..5515c00 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -18,52 +18,70 @@ package org.apache.flink.runtime.jobmanager; +import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.PoisonPill; import akka.testkit.JavaTestKit; - import com.typesafe.config.Config; - -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; -import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; +import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; import org.apache.flink.runtime.messages.JobManagerMessages.StopJob; import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure; import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess; import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; import org.apache.flink.runtime.messages.TaskMessages.PartitionState; +import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingJobManager; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished; +import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist; +import org.apache.flink.runtime.testingUtils.TestingTaskManager; +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.StoppableInvokable; +import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; - +import org.junit.rules.TemporaryFolder; +import scala.Option; import scala.Some; import scala.Tuple2; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; +import java.io.File; import java.net.InetAddress; +import java.util.Collections; +import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT; @@ -73,7 +91,10 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -public class JobManagerTest { +public class JobManagerTest extends TestLogger { + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); private static ActorSystem system; @@ -342,4 +363,105 @@ public class JobManagerTest { }}; } + /** + * Tests that we can trigger a + * + * @throws Exception + */ + @Test + public void testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception { + File defaultSavepointDir = tmpFolder.newFolder(); + + FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS); + + Configuration config = new Configuration(); + config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem"); + config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, defaultSavepointDir.getAbsolutePath()); + + ActorSystem actorSystem = null; + ActorGateway jobManager = null; + ActorGateway archiver = null; + ActorGateway taskManager = null; + try { + actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); + + Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors( + config, + actorSystem, + Option.apply("jm"), + Option.apply("arch"), + TestingJobManager.class, + TestingMemoryArchivist.class); + + jobManager = new AkkaActorGateway(master._1(), null); + archiver = new AkkaActorGateway(master._2(), null); + + ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor( + new Configuration(), + ResourceID.generate(), + actorSystem, + "localhost", + Option.apply("tm"), + Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())), + true, + TestingTaskManager.class); + + taskManager = new AkkaActorGateway(taskManagerRef, null); + + // Wait until connected + Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor()); + Await.ready(taskManager.ask(msg, timeout), timeout); + + // Create job graph + JobVertex sourceVertex = new JobVertex("Source"); + sourceVertex.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class); + sourceVertex.setParallelism(1); + + JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex); + + JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings( + Collections.singletonList(sourceVertex.getID()), + Collections.singletonList(sourceVertex.getID()), + Collections.singletonList(sourceVertex.getID()), + Long.MAX_VALUE, // deactivated checkpointing + 360000, + 0, + Integer.MAX_VALUE); + + jobGraph.setSnapshotSettings(snapshottingSettings); + + // Submit job graph + msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED); + Await.result(jobManager.ask(msg, timeout), timeout); + + // Wait for all tasks to be running + msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()); + Await.result(jobManager.ask(msg, timeout), timeout); + + // Trigger savepoint + msg = new JobManagerMessages.TriggerSavepoint(jobGraph.getJobID()); + Future<Object> future = jobManager.ask(msg, timeout); + Object result = Await.result(future, timeout); + + assertTrue("Did not trigger savepoint", result instanceof JobManagerMessages.TriggerSavepointSuccess); + assertEquals(1, defaultSavepointDir.listFiles().length); + } finally { + if (actorSystem != null) { + actorSystem.shutdown(); + } + + if (archiver != null) { + archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + if (jobManager != null) { + jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + if (taskManager != null) { + taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/b4207503/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index d6819e1..a63b089 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -20,9 +20,7 @@ package org.apache.flink.streaming.api.graph; import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; - import org.apache.commons.lang3.StringUtils; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -51,7 +49,6 @@ import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.tasks.StreamIterationHead; import org.apache.flink.streaming.runtime.tasks.StreamIterationTail; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -448,47 +445,46 @@ public class StreamingJobGraphGenerator { private void configureCheckpointing() { CheckpointConfig cfg = streamGraph.getCheckpointConfig(); - - if (cfg.isCheckpointingEnabled()) { - long interval = cfg.getCheckpointInterval(); - if (interval < 1) { - throw new IllegalArgumentException("The checkpoint interval must be positive"); - } - - // collect the vertices that receive "trigger checkpoint" messages. - // currently, these are all the sources - List<JobVertexID> triggerVertices = new ArrayList<>(); - - // collect the vertices that need to acknowledge the checkpoint - // currently, these are all vertices - List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size()); - - // collect the vertices that receive "commit checkpoint" messages - // currently, these are all vertices - List<JobVertexID> commitVertices = new ArrayList<>(); - - for (JobVertex vertex : jobVertices.values()) { - if (vertex.isInputVertex()) { - triggerVertices.add(vertex.getID()); - } - // TODO: add check whether the user function implements the checkpointing interface - commitVertices.add(vertex.getID()); - ackVertices.add(vertex.getID()); - } - - JobSnapshottingSettings settings = new JobSnapshottingSettings( - triggerVertices, ackVertices, commitVertices, interval, - cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(), - cfg.getMaxConcurrentCheckpoints()); - jobGraph.setSnapshotSettings(settings); + long interval = cfg.getCheckpointInterval(); + if (interval > 0) { // check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy if (streamGraph.getExecutionConfig().getRestartStrategy() == null) { // if the user enabled checkpointing, the default number of exec retries is infinite. streamGraph.getExecutionConfig().setRestartStrategy( RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY)); } + } else { + // interval of max value means disable periodic checkpoint + interval = Long.MAX_VALUE; + } + + // collect the vertices that receive "trigger checkpoint" messages. + // currently, these are all the sources + List<JobVertexID> triggerVertices = new ArrayList<>(); + + // collect the vertices that need to acknowledge the checkpoint + // currently, these are all vertices + List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size()); + + // collect the vertices that receive "commit checkpoint" messages + // currently, these are all vertices + List<JobVertexID> commitVertices = new ArrayList<>(); + + for (JobVertex vertex : jobVertices.values()) { + if (vertex.isInputVertex()) { + triggerVertices.add(vertex.getID()); + } + // TODO: add check whether the user function implements the checkpointing interface + commitVertices.add(vertex.getID()); + ackVertices.add(vertex.getID()); } + + JobSnapshottingSettings settings = new JobSnapshottingSettings( + triggerVertices, ackVertices, commitVertices, interval, + cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(), + cfg.getMaxConcurrentCheckpoints()); + jobGraph.setSnapshotSettings(settings); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/b4207503/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 7f94aa0..d646212 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -17,22 +17,24 @@ package org.apache.flink.streaming.api.graph; -import java.io.IOException; -import java.util.Random; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.SerializedValue; - import org.junit.Test; -import static org.junit.Assert.*; +import java.io.IOException; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; @SuppressWarnings("serial") public class StreamingJobGraphGeneratorTest { @@ -145,4 +147,20 @@ public class StreamingJobGraphGeneratorTest { assertEquals(1, jobGraph.getVerticesAsArray()[0].getParallelism()); assertEquals(1, jobGraph.getVerticesAsArray()[1].getParallelism()); } + + /** + * Tests that disabled checkpointing sets the checkpointing interval to Long.MAX_VALUE. + */ + @Test + public void testDisabledCheckpointing() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamGraph streamGraph = new StreamGraph(env); + assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled()); + + StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph); + JobGraph jobGraph = jobGraphGenerator.createJobGraph(); + + JobSnapshottingSettings snapshottingSettings = jobGraph.getSnapshotSettings(); + assertEquals(Long.MAX_VALUE, snapshottingSettings.getCheckpointInterval()); + } }
