Repository: flink
Updated Branches:
  refs/heads/release-1.1 16e7c7867 -> b4207503c


[FLINK-4510] [checkpoint] Always create CheckpointCoordinator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b4207503
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b4207503
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b4207503

Branch: refs/heads/release-1.1
Commit: b4207503c2e674ce501404cbc6cb264647f17397
Parents: 16e7c78
Author: Jark Wu <[email protected]>
Authored: Mon Oct 17 19:46:15 2016 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Thu Oct 27 17:30:50 2016 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |   2 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  46 ++++---
 .../flink/runtime/jobmanager/JobManager.scala   |  44 +++---
 .../runtime/jobmanager/JobManagerTest.java      | 138 +++++++++++++++++--
 .../api/graph/StreamingJobGraphGenerator.java   |  68 +++++----
 .../graph/StreamingJobGraphGeneratorTest.java   |  28 +++-
 6 files changed, 234 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b4207503/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 6aaa014..409f05b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -411,7 +411,7 @@ public class CheckpointCoordinator {
                        }
 
                        //make sure the minimum interval between checkpoints 
has passed
-                       if (lastTriggeredCheckpoint + 
minPauseBetweenCheckpoints > timestamp) {
+                       if (lastTriggeredCheckpoint + 
minPauseBetweenCheckpoints > timestamp && baseInterval != Long.MAX_VALUE) {
                                if (currentPeriodicTrigger != null) {
                                        currentPeriodicTrigger.cancel();
                                        currentPeriodicTrigger = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/b4207503/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 8cf8354..cc8e75d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -397,27 +397,31 @@ public class ExecutionGraph implements Serializable {
 
                checkpointStatsTracker = Objects.requireNonNull(statsTracker, 
"Checkpoint stats tracker");
 
-               // create the coordinator that triggers and commits checkpoints 
and holds the state
-               checkpointCoordinator = new CheckpointCoordinator(
-                               jobID,
-                               interval,
-                               checkpointTimeout,
-                               minPauseBetweenCheckpoints,
-                               maxConcurrentCheckpoints,
-                               numberKeyGroups,
-                               tasksToTrigger,
-                               tasksToWaitFor,
-                               tasksToCommitTo,
-                               userClassLoader,
-                               checkpointIDCounter,
-                               checkpointStore,
-                               recoveryMode,
-                               checkpointStatsTracker);
-
-               // the periodic checkpoint scheduler is activated and 
deactivated as a result of
-               // job status changes (running -> on, all other states -> off)
-               registerJobStatusListener(
-                               
checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID));
+               // interval of max long value indicates disable periodic 
checkpoint,
+               // the CheckpoitnCoordinator should be created only if the 
interval is not max value
+               if (interval != Long.MAX_VALUE) {
+                       // create the coordinator that triggers and commits 
checkpoints and holds the state
+                       checkpointCoordinator = new CheckpointCoordinator(
+                                       jobID,
+                                       interval,
+                                       checkpointTimeout,
+                                       minPauseBetweenCheckpoints,
+                                       maxConcurrentCheckpoints,
+                                       numberKeyGroups,
+                                       tasksToTrigger,
+                                       tasksToWaitFor,
+                                       tasksToCommitTo,
+                                       userClassLoader,
+                                       checkpointIDCounter,
+                                       checkpointStore,
+                                       recoveryMode,
+                                       checkpointStatsTracker);
+
+                       // the periodic checkpoint scheduler is activated and 
deactivated as a result of
+                       // job status changes (running -> on, all other states 
-> off)
+                       registerJobStatusListener(
+                                       
checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID));
+               }
 
                // Savepoint Coordinator
                savepointCoordinator = new SavepointCoordinator(

http://git-wip-us.apache.org/repos/asf/flink/blob/b4207503/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a4c2403..f16747a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1360,22 +1360,22 @@ class JobManager(
             val checkpointCoordinator = graph.getCheckpointCoordinator()
             val savepointCoordinator = graph.getSavepointCoordinator()
 
-            if (checkpointCoordinator != null && savepointCoordinator != null) 
{
+            if (checkpointCoordinator != null || savepointCoordinator != null) 
{
               future {
                 try {
-                  if 
(checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) {
+                  if (checkpointCoordinator != null &&
+                      
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) {
                     // OK, this is the common case
+                  } else if (
+                    savepointCoordinator != null &&
+                    
!savepointCoordinator.receiveAcknowledgeMessage(ackMessage)) {
+
+                    // Tried the savepoint coordinator if the message was not
+                    // addressed to the periodic checkpoint coordinator.
+                    log.info("Received message for non-existing checkpoint " +
+                      ackMessage.getCheckpointId)
                   }
-                  else {
-                    // Try the savepoint coordinator if the message was not 
addressed
-                    // to the periodic checkpoint coordinator.
-                    if 
(!savepointCoordinator.receiveAcknowledgeMessage(ackMessage)) {
-                      log.info("Received message for non-existing checkpoint " 
+
-                        ackMessage.getCheckpointId)
-                    }
-                  }
-                }
-                catch {
+                } catch {
                   case t: Throwable =>
                     log.error(s"Error in CheckpointCoordinator while 
processing $ackMessage", t)
                 }
@@ -1397,24 +1397,26 @@ class JobManager(
             val checkpointCoordinator = graph.getCheckpointCoordinator()
             val savepointCoordinator = graph.getSavepointCoordinator()
 
-            if (checkpointCoordinator != null && savepointCoordinator != null) 
{
+            if (checkpointCoordinator != null || savepointCoordinator != null) 
{
               future {
                 try {
-                  if 
(checkpointCoordinator.receiveDeclineMessage(declineMessage)) {
+                  if (checkpointCoordinator != null &&
+                    
checkpointCoordinator.receiveDeclineMessage(declineMessage)) {
                     // OK, this is the common case
                   }
-                  else {
-                    // Try the savepoint coordinator if the message was not 
addressed
+                  else if (
+                    savepointCoordinator != null &&
+                    
!savepointCoordinator.receiveDeclineMessage(declineMessage)) {
+
+                    // Tried the savepoint coordinator if the message was not 
addressed
                     // to the periodic checkpoint coordinator.
-                    if 
(!savepointCoordinator.receiveDeclineMessage(declineMessage)) {
-                      log.info("Received message for non-existing checkpoint " 
+
-                        declineMessage.getCheckpointId)
-                    }
+                    log.info("Received message for non-existing checkpoint " +
+                      declineMessage.getCheckpointId)
                   }
                 }
                 catch {
                   case t: Throwable =>
-                    log.error(s"Error in CheckpointCoordinator while 
processing $declineMessage", t)
+                    log.error(s"Error in SavepointCoordinator while processing 
$declineMessage", t)
                 }
               }(context.dispatcher)
             }

http://git-wip-us.apache.org/repos/asf/flink/blob/b4207503/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 5c25003..5515c00 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -18,52 +18,70 @@
 
 package org.apache.flink.runtime.jobmanager;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
 import akka.testkit.JavaTestKit;
-
 import com.typesafe.config.Config;
-
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
 import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
 import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
+import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
+import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
-
+import org.junit.rules.TemporaryFolder;
+import scala.Option;
 import scala.Some;
 import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
+import java.io.File;
 import java.net.InetAddress;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
 import static 
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
@@ -73,7 +91,10 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class JobManagerTest {
+public class JobManagerTest extends TestLogger {
+
+       @Rule
+       public TemporaryFolder tmpFolder = new TemporaryFolder();
 
        private static ActorSystem system;
 
@@ -342,4 +363,105 @@ public class JobManagerTest {
                }};
        }
 
+       /**
+        * Tests that we can trigger a
+        *
+        * @throws Exception
+        */
+       @Test
+       public void testSavepointWithDeactivatedPeriodicCheckpointing() throws 
Exception {
+               File defaultSavepointDir = tmpFolder.newFolder();
+
+               FiniteDuration timeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
+
+               Configuration config = new Configuration();
+               config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, 
"filesystem");
+               config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, 
defaultSavepointDir.getAbsolutePath());
+
+               ActorSystem actorSystem = null;
+               ActorGateway jobManager = null;
+               ActorGateway archiver = null;
+               ActorGateway taskManager = null;
+               try {
+                       actorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
+
+                       Tuple2<ActorRef, ActorRef> master = 
JobManager.startJobManagerActors(
+                                       config,
+                                       actorSystem,
+                                       Option.apply("jm"),
+                                       Option.apply("arch"),
+                                       TestingJobManager.class,
+                                       TestingMemoryArchivist.class);
+
+                       jobManager = new AkkaActorGateway(master._1(), null);
+                       archiver = new AkkaActorGateway(master._2(), null);
+
+                       ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
+                                       new Configuration(),
+                                       ResourceID.generate(),
+                                       actorSystem,
+                                       "localhost",
+                                       Option.apply("tm"),
+                                       
Option.<LeaderRetrievalService>apply(new 
StandaloneLeaderRetrievalService(jobManager.path())),
+                                       true,
+                                       TestingTaskManager.class);
+
+                       taskManager = new AkkaActorGateway(taskManagerRef, 
null);
+
+                       // Wait until connected
+                       Object msg = new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+                       Await.ready(taskManager.ask(msg, timeout), timeout);
+
+                       // Create job graph
+                       JobVertex sourceVertex = new JobVertex("Source");
+                       
sourceVertex.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
+                       sourceVertex.setParallelism(1);
+
+                       JobGraph jobGraph = new JobGraph("TestingJob", 
sourceVertex);
+
+                       JobSnapshottingSettings snapshottingSettings = new 
JobSnapshottingSettings(
+                                       
Collections.singletonList(sourceVertex.getID()),
+                                       
Collections.singletonList(sourceVertex.getID()),
+                                       
Collections.singletonList(sourceVertex.getID()),
+                                       Long.MAX_VALUE, // deactivated 
checkpointing
+                                       360000,
+                                       0,
+                                       Integer.MAX_VALUE);
+
+                       jobGraph.setSnapshotSettings(snapshottingSettings);
+
+                       // Submit job graph
+                       msg = new JobManagerMessages.SubmitJob(jobGraph, 
ListeningBehaviour.DETACHED);
+                       Await.result(jobManager.ask(msg, timeout), timeout);
+
+                       // Wait for all tasks to be running
+                       msg = new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+                       Await.result(jobManager.ask(msg, timeout), timeout);
+
+                       // Trigger savepoint
+                       msg = new 
JobManagerMessages.TriggerSavepoint(jobGraph.getJobID());
+                       Future<Object> future = jobManager.ask(msg, timeout);
+                       Object result = Await.result(future, timeout);
+
+                       assertTrue("Did not trigger savepoint", result 
instanceof JobManagerMessages.TriggerSavepointSuccess);
+                       assertEquals(1, defaultSavepointDir.listFiles().length);
+               } finally {
+                       if (actorSystem != null) {
+                               actorSystem.shutdown();
+                       }
+
+                       if (archiver != null) {
+                               archiver.actor().tell(PoisonPill.getInstance(), 
ActorRef.noSender());
+                       }
+
+                       if (jobManager != null) {
+                               
jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+                       }
+
+                       if (taskManager != null) {
+                               
taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+                       }
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b4207503/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index d6819e1..a63b089 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -20,9 +20,7 @@ package org.apache.flink.streaming.api.graph;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
-
 import org.apache.commons.lang3.StringUtils;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -51,7 +49,6 @@ import 
org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -448,47 +445,46 @@ public class StreamingJobGraphGenerator {
        
        private void configureCheckpointing() {
                CheckpointConfig cfg = streamGraph.getCheckpointConfig();
-               
-               if (cfg.isCheckpointingEnabled()) {
-                       long interval = cfg.getCheckpointInterval();
-                       if (interval < 1) {
-                               throw new IllegalArgumentException("The 
checkpoint interval must be positive");
-                       }
-
-                       // collect the vertices that receive "trigger 
checkpoint" messages.
-                       // currently, these are all the sources
-                       List<JobVertexID> triggerVertices = new ArrayList<>();
-
-                       // collect the vertices that need to acknowledge the 
checkpoint
-                       // currently, these are all vertices
-                       List<JobVertexID> ackVertices = new 
ArrayList<>(jobVertices.size());
-
-                       // collect the vertices that receive "commit 
checkpoint" messages
-                       // currently, these are all vertices
-                       List<JobVertexID> commitVertices = new ArrayList<>();
-                       
-                       for (JobVertex vertex : jobVertices.values()) {
-                               if (vertex.isInputVertex()) {
-                                       triggerVertices.add(vertex.getID());
-                               }
-                               // TODO: add check whether the user function 
implements the checkpointing interface
-                               commitVertices.add(vertex.getID());
-                               ackVertices.add(vertex.getID());
-                       }
-
-                       JobSnapshottingSettings settings = new 
JobSnapshottingSettings(
-                                       triggerVertices, ackVertices, 
commitVertices, interval,
-                                       cfg.getCheckpointTimeout(), 
cfg.getMinPauseBetweenCheckpoints(),
-                                       cfg.getMaxConcurrentCheckpoints());
-                       jobGraph.setSnapshotSettings(settings);
 
+               long interval = cfg.getCheckpointInterval();
+               if (interval > 0) {
                        // check if a restart strategy has been set, if not 
then set the FixedDelayRestartStrategy
                        if 
(streamGraph.getExecutionConfig().getRestartStrategy() == null) {
                                // if the user enabled checkpointing, the 
default number of exec retries is infinite.
                                
streamGraph.getExecutionConfig().setRestartStrategy(
                                        
RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
                        }
+               } else {
+                       // interval of max value means disable periodic 
checkpoint
+                       interval = Long.MAX_VALUE;
+               }
+
+               // collect the vertices that receive "trigger checkpoint" 
messages.
+               // currently, these are all the sources
+               List<JobVertexID> triggerVertices = new ArrayList<>();
+
+               // collect the vertices that need to acknowledge the checkpoint
+               // currently, these are all vertices
+               List<JobVertexID> ackVertices = new 
ArrayList<>(jobVertices.size());
+
+               // collect the vertices that receive "commit checkpoint" 
messages
+               // currently, these are all vertices
+               List<JobVertexID> commitVertices = new ArrayList<>();
+
+               for (JobVertex vertex : jobVertices.values()) {
+                       if (vertex.isInputVertex()) {
+                               triggerVertices.add(vertex.getID());
+                       }
+                       // TODO: add check whether the user function implements 
the checkpointing interface
+                       commitVertices.add(vertex.getID());
+                       ackVertices.add(vertex.getID());
                }
+
+               JobSnapshottingSettings settings = new JobSnapshottingSettings(
+                               triggerVertices, ackVertices, commitVertices, 
interval,
+                               cfg.getCheckpointTimeout(), 
cfg.getMinPauseBetweenCheckpoints(),
+                               cfg.getMaxConcurrentCheckpoints());
+               jobGraph.setSnapshotSettings(settings);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b4207503/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 7f94aa0..d646212 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -17,22 +17,24 @@
 
 package org.apache.flink.streaming.api.graph;
 
-import java.io.IOException;
-import java.util.Random;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
-
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import java.io.IOException;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 
 @SuppressWarnings("serial")
 public class StreamingJobGraphGeneratorTest {
@@ -145,4 +147,20 @@ public class StreamingJobGraphGeneratorTest {
                assertEquals(1, 
jobGraph.getVerticesAsArray()[0].getParallelism());
                assertEquals(1, 
jobGraph.getVerticesAsArray()[1].getParallelism());
        }
+
+       /**
+        * Tests that disabled checkpointing sets the checkpointing interval to 
Long.MAX_VALUE.
+        */
+       @Test
+       public void testDisabledCheckpointing() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               StreamGraph streamGraph = new StreamGraph(env);
+               assertFalse("Checkpointing enabled", 
streamGraph.getCheckpointConfig().isCheckpointingEnabled());
+
+               StreamingJobGraphGenerator jobGraphGenerator = new 
StreamingJobGraphGenerator(streamGraph);
+               JobGraph jobGraph = jobGraphGenerator.createJobGraph();
+
+               JobSnapshottingSettings snapshottingSettings = 
jobGraph.getSnapshotSettings();
+               assertEquals(Long.MAX_VALUE, 
snapshottingSettings.getCheckpointInterval());
+       }
 }

Reply via email to