Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4254#discussion_r145907545
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
---
@@ -940,6 +955,177 @@ public void testCancelWithSavepoint() throws
Exception {
}
/**
+ * Tests that a failed cancel-job-with-savepoint request does not
accidentally disable
+ * periodic checkpoints.
+ */
+ @Test
+ public void testCancelJobWithSavepointFailurePeriodicCheckpoints()
throws Exception {
+ testCancelJobWithSavepointFailure(true);
+ }
+
+ /**
+ * Tests that a failed cancel-job-with-savepoint request does not
accidentally enable
+ * periodic checkpoints.
+ */
+ @Test
+ public void testCancelJobWithSavepointFailureNoPeriodicCheckpoints()
throws Exception {
+ testCancelJobWithSavepointFailure(false);
+ }
+
+ /**
+ * Tests that a failed savepoint does not cancel the job and that there
are no
+ * unintended side effects.
+ *
+ * @param enablePeriodicCheckpoints Flag to indicate whether to enable
periodic checkpoints. We
+ * need to test both here in order to verify that we don't accidentally
disable or enable
+ * checkpoints after a failed cancel-job-with-savepoint request.
+ */
+ private void testCancelJobWithSavepointFailure(
+ boolean enablePeriodicCheckpoints) throws Exception {
+
+ long checkpointInterval = enablePeriodicCheckpoints ? 3600000 :
Long.MAX_VALUE;
+
+ // Savepoint target
+ File savepointTarget = tmpFolder.newFolder();
+ savepointTarget.deleteOnExit();
+
+ // Timeout for Akka messages
+ FiniteDuration askTimeout = new FiniteDuration(30,
TimeUnit.SECONDS);
+
+ // A source that declines savepoints, simulating the behaviour
+ // of a failed savepoint.
+ JobVertex sourceVertex = new JobVertex("Source");
+
sourceVertex.setInvokableClass(FailOnSavepointStatefulTask.class);
+ sourceVertex.setParallelism(1);
+ JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
+
+ final ActorSystem actorSystem =
AkkaUtils.createLocalActorSystem(new Configuration());
+
+ try {
+ Tuple2<ActorRef, ActorRef> master =
JobManager.startJobManagerActors(
+ new Configuration(),
+ actorSystem,
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ highAvailabilityServices,
+ Option.apply("jm"),
+ Option.apply("arch"),
+ TestingJobManager.class,
+ TestingMemoryArchivist.class);
+
+ UUID leaderId =
LeaderRetrievalUtils.retrieveLeaderSessionId(
+
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+ TestingUtils.TESTING_TIMEOUT());
+
+ ActorGateway jobManager = new
AkkaActorGateway(master._1(), leaderId);
+
+ ActorRef taskManagerRef =
TaskManager.startTaskManagerComponentsAndActor(
+ new Configuration(),
+ ResourceID.generate(),
+ actorSystem,
+ highAvailabilityServices,
+ "localhost",
+ Option.apply("tm"),
+ true,
+ TestingTaskManager.class);
+
+ ActorGateway taskManager = new
AkkaActorGateway(taskManagerRef, leaderId);
--- End diff --
Can't we simply use a `TestingCluster` here for all the setup work?
---