Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/4254#discussion_r125606557
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
---
@@ -940,6 +955,177 @@ public void testCancelWithSavepoint() throws
Exception {
}
/**
+ * Tests that a failed cancel-job-with-savepoint request does not
accidentally disable
+ * periodic checkpoints.
+ */
+ @Test
+ public void testCancelJobWithSavepointFailurePeriodicCheckpoints()
throws Exception {
+ testCancelJobWithSavepointFailure(true);
+ }
+
+ /**
+ * Tests that a failed cancel-job-with-savepoint request does not
accidentally enable
+ * periodic checkpoints.
+ */
+ @Test
+ public void testCancelJobWithSavepointFailureNoPeriodicCheckpoints()
throws Exception {
+ testCancelJobWithSavepointFailure(false);
+ }
+
+ /**
+ * Tests that a failed savepoint does not cancel the job and that there
are no
+ * unintended side effects.
+ *
+ * @param enablePeriodicCheckpoints Flag to indicate whether to enable
periodic checkpoints. We
+ * need to test both here in order to verify that we don't accidentally
disable or enable
+ * checkpoints after a failed cancel-job-with-savepoint request.
+ */
+ private void testCancelJobWithSavepointFailure(
+ boolean enablePeriodicCheckpoints) throws Exception {
+
+ long checkpointInterval = enablePeriodicCheckpoints ? 3600000 :
Long.MAX_VALUE;
+
+ // Savepoint target
+ File savepointTarget = tmpFolder.newFolder();
+ savepointTarget.deleteOnExit();
+
+ // Timeout for Akka messages
+ FiniteDuration askTimeout = new FiniteDuration(30,
TimeUnit.SECONDS);
+
+ // A source that declines savepoints, simulating the behaviour
+ // of a failed savepoint.
+ JobVertex sourceVertex = new JobVertex("Source");
+
sourceVertex.setInvokableClass(FailOnSavepointStatefulTask.class);
+ sourceVertex.setParallelism(1);
+ JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
+
+ final ActorSystem actorSystem =
AkkaUtils.createLocalActorSystem(new Configuration());
+
+ try {
+ Tuple2<ActorRef, ActorRef> master =
JobManager.startJobManagerActors(
+ new Configuration(),
+ actorSystem,
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ highAvailabilityServices,
+ Option.apply("jm"),
+ Option.apply("arch"),
+ TestingJobManager.class,
+ TestingMemoryArchivist.class);
+
+ UUID leaderId =
LeaderRetrievalUtils.retrieveLeaderSessionId(
+
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+ TestingUtils.TESTING_TIMEOUT());
+
+ ActorGateway jobManager = new
AkkaActorGateway(master._1(), leaderId);
+
+ ActorRef taskManagerRef =
TaskManager.startTaskManagerComponentsAndActor(
+ new Configuration(),
+ ResourceID.generate(),
+ actorSystem,
+ highAvailabilityServices,
+ "localhost",
+ Option.apply("tm"),
+ true,
+ TestingTaskManager.class);
+
+ ActorGateway taskManager = new
AkkaActorGateway(taskManagerRef, leaderId);
+
+ // Wait until connected
+ Object msg = new
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+ Await.ready(taskManager.ask(msg, askTimeout),
askTimeout);
+
+ JobCheckpointingSettings snapshottingSettings = new
JobCheckpointingSettings(
+ Collections.singletonList(sourceVertex.getID()),
+ Collections.singletonList(sourceVertex.getID()),
+ Collections.singletonList(sourceVertex.getID()),
+ checkpointInterval,
+ 3600000,
+ 0,
+ Integer.MAX_VALUE,
+ ExternalizedCheckpointSettings.none(),
+ null,
+ true);
+
+ jobGraph.setSnapshotSettings(snapshottingSettings);
+
+ // Submit job graph
+ msg = new JobManagerMessages.SubmitJob(jobGraph,
ListeningBehaviour.DETACHED);
+ Await.ready(jobManager.ask(msg, askTimeout),
askTimeout);
+
+ // Wait for all tasks to be running
+ msg = new
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+ Await.ready(jobManager.ask(msg, askTimeout),
askTimeout);
+
+ //
----------------------------------------------------------------
+ // Super ugly... sorry! But this is one of the few bad
options
+ // to test this here. Ideally, we would have a factory
that we
+ // can set in tests as desired. But we don't. So here
we go...
+ msg = new RequestExecutionGraph(jobGraph.getJobID());
+ Object result = Await.result(jobManager.ask(msg,
askTimeout), askTimeout);
+
+ ExecutionGraph eg;
+ if (result instanceof ExecutionGraphFound) {
+ // Sorry...
+ eg = (ExecutionGraph) ((ExecutionGraphFound)
result).executionGraph();
+ } else {
+ throw new RuntimeException("Could not access
ExecutionGraph for job with "
+ + "ID " + jobGraph.getJobID() + ".
Response: " + result.toString());
+
+ }
+
+ Field field =
eg.getClass().getDeclaredField("checkpointCoordinator");
+ field.setAccessible(true);
+ CheckpointCoordinator coord = (CheckpointCoordinator)
field.get(eg);
+ CheckpointCoordinator spiedCoord = Mockito.spy(coord);
+ field.set(eg, spiedCoord);
+ //
----------------------------------------------------------------
+
+ // Cancel with savepoint
+ msg = new
JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(),
+ savepointTarget.getAbsolutePath());
+ CancellationResponse cancelResp =
(CancellationResponse) Await.result(
+ jobManager.ask(msg, askTimeout), askTimeout);
+
+ if (cancelResp instanceof CancellationFailure) {
+ if (enablePeriodicCheckpoints) {
+ // Verify checkpoint scheduler
deactivated and reactivated.
+ // A call to start checkpoint scheduler
calls stop scheduler
+ // again. Therefore, we verify two
calls for stop. Since we
+ // spy (I know...) on the coordinator
after the job has
+ // started, we don't count calls before
spying.
+ verify(spiedCoord,
times(1)).startCheckpointScheduler();
--- End diff --
The thing is that the stopping of the scheduler is part of the expected
behaviour of cancel-with-job-savepoint, because we don't want any checkpoints
between the savepoint and cancel job
(https://issues.apache.org/jira/browse/FLINK-4717). I think for that we do need
the spying :-( It was simply not fully tested before... Does this make sense or
am I missing your point?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---