Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/4254#discussion_r145923787 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java --- @@ -940,6 +955,177 @@ public void testCancelWithSavepoint() throws Exception { } /** + * Tests that a failed cancel-job-with-savepoint request does not accidentally disable + * periodic checkpoints. + */ + @Test + public void testCancelJobWithSavepointFailurePeriodicCheckpoints() throws Exception { + testCancelJobWithSavepointFailure(true); + } + + /** + * Tests that a failed cancel-job-with-savepoint request does not accidentally enable + * periodic checkpoints. + */ + @Test + public void testCancelJobWithSavepointFailureNoPeriodicCheckpoints() throws Exception { + testCancelJobWithSavepointFailure(false); + } + + /** + * Tests that a failed savepoint does not cancel the job and that there are no + * unintended side effects. + * + * @param enablePeriodicCheckpoints Flag to indicate whether to enable periodic checkpoints. We + * need to test both here in order to verify that we don't accidentally disable or enable + * checkpoints after a failed cancel-job-with-savepoint request. + */ + private void testCancelJobWithSavepointFailure( + boolean enablePeriodicCheckpoints) throws Exception { + + long checkpointInterval = enablePeriodicCheckpoints ? 3600000 : Long.MAX_VALUE; + + // Savepoint target + File savepointTarget = tmpFolder.newFolder(); + savepointTarget.deleteOnExit(); + + // Timeout for Akka messages + FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS); + + // A source that declines savepoints, simulating the behaviour + // of a failed savepoint. + JobVertex sourceVertex = new JobVertex("Source"); + sourceVertex.setInvokableClass(FailOnSavepointStatefulTask.class); + sourceVertex.setParallelism(1); + JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex); + + final ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); + + try { + Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors( + new Configuration(), + actorSystem, + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + highAvailabilityServices, + Option.apply("jm"), + Option.apply("arch"), + TestingJobManager.class, + TestingMemoryArchivist.class); + + UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId( + highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), + TestingUtils.TESTING_TIMEOUT()); + + ActorGateway jobManager = new AkkaActorGateway(master._1(), leaderId); + + ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor( + new Configuration(), + ResourceID.generate(), + actorSystem, + highAvailabilityServices, + "localhost", + Option.apply("tm"), + true, + TestingTaskManager.class); + + ActorGateway taskManager = new AkkaActorGateway(taskManagerRef, leaderId); --- End diff -- Definitely +1
---