Repository: flink
Updated Branches:
  refs/heads/master b05c3c1b0 -> 398bd9b31


[FLINK-4510] [checkpoint] Always create CheckpointCoordinator

This closes #2453.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/398bd9b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/398bd9b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/398bd9b3

Branch: refs/heads/master
Commit: 398bd9b3198e70f199a1277a9f5074e81cbee3c8
Parents: b05c3c1
Author: Jark Wu <[email protected]>
Authored: Mon Oct 17 19:46:15 2016 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Wed Oct 19 13:50:08 2016 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  10 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |   6 +-
 .../runtime/jobmanager/JobManagerTest.java      | 102 +++++++++++++++++++
 .../api/graph/StreamingJobGraphGenerator.java   |  89 ++++++++--------
 .../graph/StreamingJobGraphGeneratorTest.java   |  33 ++++--
 5 files changed, 178 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/398bd9b3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index aa9406c..101bdba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -389,9 +389,13 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                                checkpointDir,
                                checkpointStatsTracker);
 
-               // the periodic checkpoint scheduler is activated and 
deactivated as a result of
-               // job status changes (running -> on, all other states -> off)
-               
registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
+               // interval of max long value indicates disable periodic 
checkpoint,
+               // the CheckpointActivatorDeactivator should be created only if 
the interval is not max value
+               if (interval != Long.MAX_VALUE) {
+                       // the periodic checkpoint scheduler is activated and 
deactivated as a result of
+                       // job status changes (running -> on, all other states 
-> off)
+                       
registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/398bd9b3/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 2a20c6c..bbe10d1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2683,7 +2683,7 @@ public class CheckpointCoordinatorTest {
                                null,
                                true);
 
-               assertEquals(true, triggerResult.isFailure());
+               assertTrue(triggerResult.isFailure());
                
assertEquals(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN, 
triggerResult.getFailureReason());
 
                // Not periodic
@@ -2693,7 +2693,7 @@ public class CheckpointCoordinatorTest {
                                null,
                                false);
 
-               assertEquals(false, triggerResult.isFailure());
+               assertFalse(triggerResult.isFailure());
        }
 
        private void testCreateKeyGroupPartitions(int maxParallelism, int 
parallelism) {
@@ -2851,7 +2851,7 @@ public class CheckpointCoordinatorTest {
                String targetDirectory = "xjasdkjakshdmmmxna";
 
                CheckpointTriggerResult triggerResult = 
coord.triggerCheckpoint(timestamp, props, targetDirectory, false);
-               assertEquals(true, triggerResult.isSuccess());
+               assertTrue(triggerResult.isSuccess());
 
                // validate that we have a pending checkpoint
                assertEquals(1, coord.getNumberOfPendingCheckpoints());

http://git-wip-us.apache.org/repos/asf/flink/blob/398bd9b3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 3c6ae9d..b5e5d45 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -59,6 +59,7 @@ import 
org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
+import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
@@ -702,6 +703,107 @@ public class JobManagerTest {
                                
taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
                        }
                }
+       }
+
+       /**
+        * Tests that we can trigger a
+        *
+        * @throws Exception
+        */
+       @Test
+       public void testSavepointWithDeactivatedPeriodicCheckpointing() throws 
Exception {
+               File defaultSavepointDir = tmpFolder.newFolder();
 
+               FiniteDuration timeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
+               Configuration config = new Configuration();
+               config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, 
defaultSavepointDir.getAbsolutePath());
+
+               ActorSystem actorSystem = null;
+               ActorGateway jobManager = null;
+               ActorGateway archiver = null;
+               ActorGateway taskManager = null;
+               try {
+                       actorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
+
+                       Tuple2<ActorRef, ActorRef> master = 
JobManager.startJobManagerActors(
+                                       config,
+                                       actorSystem,
+                                       Option.apply("jm"),
+                                       Option.apply("arch"),
+                                       TestingJobManager.class,
+                                       TestingMemoryArchivist.class);
+
+                       jobManager = new AkkaActorGateway(master._1(), null);
+                       archiver = new AkkaActorGateway(master._2(), null);
+
+                       ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
+                                       config,
+                                       ResourceID.generate(),
+                                       actorSystem,
+                                       "localhost",
+                                       Option.apply("tm"),
+                                       
Option.<LeaderRetrievalService>apply(new 
StandaloneLeaderRetrievalService(jobManager.path())),
+                                       true,
+                                       TestingTaskManager.class);
+
+                       taskManager = new AkkaActorGateway(taskManagerRef, 
null);
+
+                       // Wait until connected
+                       Object msg = new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+                       Await.ready(taskManager.ask(msg, timeout), timeout);
+
+                       // Create job graph
+                       JobVertex sourceVertex = new JobVertex("Source");
+                       
sourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
+                       sourceVertex.setParallelism(1);
+
+                       JobGraph jobGraph = new JobGraph("TestingJob", 
sourceVertex);
+
+                       JobSnapshottingSettings snapshottingSettings = new 
JobSnapshottingSettings(
+                                       
Collections.singletonList(sourceVertex.getID()),
+                                       
Collections.singletonList(sourceVertex.getID()),
+                                       
Collections.singletonList(sourceVertex.getID()),
+                                       Long.MAX_VALUE, // deactivated 
checkpointing
+                                       360000,
+                                       0,
+                                       Integer.MAX_VALUE,
+                                       ExternalizedCheckpointSettings.none());
+
+                       jobGraph.setSnapshotSettings(snapshottingSettings);
+
+                       // Submit job graph
+                       msg = new JobManagerMessages.SubmitJob(jobGraph, 
ListeningBehaviour.DETACHED);
+                       Await.result(jobManager.ask(msg, timeout), timeout);
+
+                       // Wait for all tasks to be running
+                       msg = new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+                       Await.result(jobManager.ask(msg, timeout), timeout);
+
+                       // Cancel with savepoint
+                       File targetDirectory = tmpFolder.newFolder();
+
+                       msg = new TriggerSavepoint(jobGraph.getJobID(), 
Option.apply(targetDirectory.getAbsolutePath()));
+                       Future<Object> future = jobManager.ask(msg, timeout);
+                       Object result = Await.result(future, timeout);
+
+                       assertTrue("Did not trigger savepoint", result 
instanceof JobManagerMessages.TriggerSavepointSuccess);
+                       assertEquals(1, targetDirectory.listFiles().length);
+               } finally {
+                       if (actorSystem != null) {
+                               actorSystem.shutdown();
+                       }
+
+                       if (archiver != null) {
+                               archiver.actor().tell(PoisonPill.getInstance(), 
ActorRef.noSender());
+                       }
+
+                       if (jobManager != null) {
+                               
jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+                       }
+
+                       if (taskManager != null) {
+                               
taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+                       }
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/398bd9b3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 0b7dc2a..824e375 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -464,59 +464,58 @@ public class StreamingJobGraphGenerator {
        
        private void configureCheckpointing() {
                CheckpointConfig cfg = streamGraph.getCheckpointConfig();
-               
-               if (cfg.isCheckpointingEnabled()) {
-                       long interval = cfg.getCheckpointInterval();
-                       if (interval < 1) {
-                               throw new IllegalArgumentException("The 
checkpoint interval must be positive");
-                       }
-
-                       // collect the vertices that receive "trigger 
checkpoint" messages.
-                       // currently, these are all the sources
-                       List<JobVertexID> triggerVertices = new ArrayList<>();
-
-                       // collect the vertices that need to acknowledge the 
checkpoint
-                       // currently, these are all vertices
-                       List<JobVertexID> ackVertices = new 
ArrayList<>(jobVertices.size());
-
-                       // collect the vertices that receive "commit 
checkpoint" messages
-                       // currently, these are all vertices
-                       List<JobVertexID> commitVertices = new ArrayList<>();
-                       
-                       for (JobVertex vertex : jobVertices.values()) {
-                               if (vertex.isInputVertex()) {
-                                       triggerVertices.add(vertex.getID());
-                               }
-                               commitVertices.add(vertex.getID());
-                               ackVertices.add(vertex.getID());
-                       }
-
-                       ExternalizedCheckpointSettings 
externalizedCheckpointSettings;
-                       if (cfg.isExternalizedCheckpointsEnabled()) {
-                               CheckpointConfig.ExternalizedCheckpointCleanup 
cleanup = cfg.getExternalizedCheckpointCleanup();
-                               // Sanity check
-                               if (cleanup == null) {
-                                       throw new 
IllegalStateException("Externalized checkpoints enabled, but no cleanup mode 
configured.");
-                               }
-                               externalizedCheckpointSettings = 
ExternalizedCheckpointSettings.externalizeCheckpoints(cleanup.deleteOnCancellation());
-                       } else {
-                               externalizedCheckpointSettings = 
ExternalizedCheckpointSettings.none();
-                       }
-
-                       JobSnapshottingSettings settings = new 
JobSnapshottingSettings(
-                                       triggerVertices, ackVertices, 
commitVertices, interval,
-                                       cfg.getCheckpointTimeout(), 
cfg.getMinPauseBetweenCheckpoints(),
-                                       cfg.getMaxConcurrentCheckpoints(),
-                                       externalizedCheckpointSettings);
-                       jobGraph.setSnapshotSettings(settings);
 
+               long interval = cfg.getCheckpointInterval();
+               if (interval > 0) {
                        // check if a restart strategy has been set, if not 
then set the FixedDelayRestartStrategy
                        if 
(streamGraph.getExecutionConfig().getRestartStrategy() == null) {
                                // if the user enabled checkpointing, the 
default number of exec retries is infinite.
                                
streamGraph.getExecutionConfig().setRestartStrategy(
                                        
RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
                        }
+               } else {
+                       // interval of max value means disable periodic 
checkpoint
+                       interval = Long.MAX_VALUE;
+               }
+
+               // collect the vertices that receive "trigger checkpoint" 
messages.
+               // currently, these are all the sources
+               List<JobVertexID> triggerVertices = new ArrayList<>();
+
+               // collect the vertices that need to acknowledge the checkpoint
+               // currently, these are all vertices
+               List<JobVertexID> ackVertices = new 
ArrayList<>(jobVertices.size());
+
+               // collect the vertices that receive "commit checkpoint" 
messages
+               // currently, these are all vertices
+               List<JobVertexID> commitVertices = new ArrayList<>();
+
+               for (JobVertex vertex : jobVertices.values()) {
+                       if (vertex.isInputVertex()) {
+                               triggerVertices.add(vertex.getID());
+                       }
+                       commitVertices.add(vertex.getID());
+                       ackVertices.add(vertex.getID());
                }
+
+               ExternalizedCheckpointSettings externalizedCheckpointSettings;
+               if (cfg.isExternalizedCheckpointsEnabled()) {
+                       CheckpointConfig.ExternalizedCheckpointCleanup cleanup 
= cfg.getExternalizedCheckpointCleanup();
+                       // Sanity check
+                       if (cleanup == null) {
+                               throw new IllegalStateException("Externalized 
checkpoints enabled, but no cleanup mode configured.");
+                       }
+                       externalizedCheckpointSettings = 
ExternalizedCheckpointSettings.externalizeCheckpoints(cleanup.deleteOnCancellation());
+               } else {
+                       externalizedCheckpointSettings = 
ExternalizedCheckpointSettings.none();
+               }
+
+               JobSnapshottingSettings settings = new JobSnapshottingSettings(
+                               triggerVertices, ackVertices, commitVertices, 
interval,
+                               cfg.getCheckpointTimeout(), 
cfg.getMinPauseBetweenCheckpoints(),
+                               cfg.getMaxConcurrentCheckpoints(),
+                               externalizedCheckpointSettings);
+               jobGraph.setSnapshotSettings(settings);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/398bd9b3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 6259598..b817c93 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -17,29 +17,25 @@
 
 package org.apache.flink.streaming.api.graph;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.util.NoOpIntMap;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
-
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import java.io.IOException;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 
 @SuppressWarnings("serial")
 public class StreamingJobGraphGeneratorTest extends TestLogger {
@@ -159,4 +155,19 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                assertEquals(1, 
jobGraph.getVerticesAsArray()[1].getParallelism());
        }
 
+       /**
+        * Tests that disabled checkpointing sets the checkpointing interval to 
Long.MAX_VALUE.
+        */
+       @Test
+       public void testDisabledCheckpointing() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               StreamGraph streamGraph = new StreamGraph(env);
+               assertFalse("Checkpointing enabled", 
streamGraph.getCheckpointConfig().isCheckpointingEnabled());
+
+               StreamingJobGraphGenerator jobGraphGenerator = new 
StreamingJobGraphGenerator(streamGraph);
+               JobGraph jobGraph = jobGraphGenerator.createJobGraph();
+
+               JobSnapshottingSettings snapshottingSettings = 
jobGraph.getSnapshotSettings();
+               assertEquals(Long.MAX_VALUE, 
snapshottingSettings.getCheckpointInterval());
+       }
 }

Reply via email to