[
https://issues.apache.org/jira/browse/FLINK-7067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074542#comment-16074542
]
ASF GitHub Bot commented on FLINK-7067:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/4254#discussion_r125606557
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
---
@@ -940,6 +955,177 @@ public void testCancelWithSavepoint() throws
Exception {
}
/**
+ * Tests that a failed cancel-job-with-savepoint request does not
accidentally disable
+ * periodic checkpoints.
+ */
+ @Test
+ public void testCancelJobWithSavepointFailurePeriodicCheckpoints()
throws Exception {
+ testCancelJobWithSavepointFailure(true);
+ }
+
+ /**
+ * Tests that a failed cancel-job-with-savepoint request does not
accidentally enable
+ * periodic checkpoints.
+ */
+ @Test
+ public void testCancelJobWithSavepointFailureNoPeriodicCheckpoints()
throws Exception {
+ testCancelJobWithSavepointFailure(false);
+ }
+
+ /**
+ * Tests that a failed savepoint does not cancel the job and that there
are no
+ * unintended side effects.
+ *
+ * @param enablePeriodicCheckpoints Flag to indicate whether to enable
periodic checkpoints. We
+ * need to test both here in order to verify that we don't accidentally
disable or enable
+ * checkpoints after a failed cancel-job-with-savepoint request.
+ */
+ private void testCancelJobWithSavepointFailure(
+ boolean enablePeriodicCheckpoints) throws Exception {
+
+ long checkpointInterval = enablePeriodicCheckpoints ? 3600000 :
Long.MAX_VALUE;
+
+ // Savepoint target
+ File savepointTarget = tmpFolder.newFolder();
+ savepointTarget.deleteOnExit();
+
+ // Timeout for Akka messages
+ FiniteDuration askTimeout = new FiniteDuration(30,
TimeUnit.SECONDS);
+
+ // A source that declines savepoints, simulating the behaviour
+ // of a failed savepoint.
+ JobVertex sourceVertex = new JobVertex("Source");
+
sourceVertex.setInvokableClass(FailOnSavepointStatefulTask.class);
+ sourceVertex.setParallelism(1);
+ JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
+
+ final ActorSystem actorSystem =
AkkaUtils.createLocalActorSystem(new Configuration());
+
+ try {
+ Tuple2<ActorRef, ActorRef> master =
JobManager.startJobManagerActors(
+ new Configuration(),
+ actorSystem,
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ highAvailabilityServices,
+ Option.apply("jm"),
+ Option.apply("arch"),
+ TestingJobManager.class,
+ TestingMemoryArchivist.class);
+
+ UUID leaderId =
LeaderRetrievalUtils.retrieveLeaderSessionId(
+
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+ TestingUtils.TESTING_TIMEOUT());
+
+ ActorGateway jobManager = new
AkkaActorGateway(master._1(), leaderId);
+
+ ActorRef taskManagerRef =
TaskManager.startTaskManagerComponentsAndActor(
+ new Configuration(),
+ ResourceID.generate(),
+ actorSystem,
+ highAvailabilityServices,
+ "localhost",
+ Option.apply("tm"),
+ true,
+ TestingTaskManager.class);
+
+ ActorGateway taskManager = new
AkkaActorGateway(taskManagerRef, leaderId);
+
+ // Wait until connected
+ Object msg = new
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+ Await.ready(taskManager.ask(msg, askTimeout),
askTimeout);
+
+ JobCheckpointingSettings snapshottingSettings = new
JobCheckpointingSettings(
+ Collections.singletonList(sourceVertex.getID()),
+ Collections.singletonList(sourceVertex.getID()),
+ Collections.singletonList(sourceVertex.getID()),
+ checkpointInterval,
+ 3600000,
+ 0,
+ Integer.MAX_VALUE,
+ ExternalizedCheckpointSettings.none(),
+ null,
+ true);
+
+ jobGraph.setSnapshotSettings(snapshottingSettings);
+
+ // Submit job graph
+ msg = new JobManagerMessages.SubmitJob(jobGraph,
ListeningBehaviour.DETACHED);
+ Await.ready(jobManager.ask(msg, askTimeout),
askTimeout);
+
+ // Wait for all tasks to be running
+ msg = new
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+ Await.ready(jobManager.ask(msg, askTimeout),
askTimeout);
+
+ //
----------------------------------------------------------------
+ // Super ugly... sorry! But this is one of the few bad
options
+ // to test this here. Ideally, we would have a factory
that we
+ // can set in tests as desired. But we don't. So here
we go...
+ msg = new RequestExecutionGraph(jobGraph.getJobID());
+ Object result = Await.result(jobManager.ask(msg,
askTimeout), askTimeout);
+
+ ExecutionGraph eg;
+ if (result instanceof ExecutionGraphFound) {
+ // Sorry...
+ eg = (ExecutionGraph) ((ExecutionGraphFound)
result).executionGraph();
+ } else {
+ throw new RuntimeException("Could not access
ExecutionGraph for job with "
+ + "ID " + jobGraph.getJobID() + ".
Response: " + result.toString());
+
+ }
+
+ Field field =
eg.getClass().getDeclaredField("checkpointCoordinator");
+ field.setAccessible(true);
+ CheckpointCoordinator coord = (CheckpointCoordinator)
field.get(eg);
+ CheckpointCoordinator spiedCoord = Mockito.spy(coord);
+ field.set(eg, spiedCoord);
+ //
----------------------------------------------------------------
+
+ // Cancel with savepoint
+ msg = new
JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(),
+ savepointTarget.getAbsolutePath());
+ CancellationResponse cancelResp =
(CancellationResponse) Await.result(
+ jobManager.ask(msg, askTimeout), askTimeout);
+
+ if (cancelResp instanceof CancellationFailure) {
+ if (enablePeriodicCheckpoints) {
+ // Verify checkpoint scheduler
deactivated and reactivated.
+ // A call to start checkpoint scheduler
calls stop scheduler
+ // again. Therefore, we verify two
calls for stop. Since we
+ // spy (I know...) on the coordinator
after the job has
+ // started, we don't count calls before
spying.
+ verify(spiedCoord,
times(1)).startCheckpointScheduler();
--- End diff --
The thing is that the stopping of the scheduler is part of the expected
behaviour of cancel-with-job-savepoint, because we don't want any checkpoints
between the savepoint and cancel job
(https://issues.apache.org/jira/browse/FLINK-4717). I think for that we do need
the spying :-( It was simply not fully tested before... Does this make sense or
am I missing your point?
> Cancel with savepoint does not restart checkpoint scheduler on failure
> ----------------------------------------------------------------------
>
> Key: FLINK-7067
> URL: https://issues.apache.org/jira/browse/FLINK-7067
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Affects Versions: 1.3.1
> Reporter: Ufuk Celebi
>
> The `CancelWithSavepoint` action of the JobManager first stops the checkpoint
> scheduler, then triggers a savepoint, and cancels the job after the savepoint
> completes.
> If the savepoint fails, the command should not have any side effects and we
> don't cancel the job. The issue is that the checkpoint scheduler is not
> restarted though.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)