[ 
https://issues.apache.org/jira/browse/FLINK-7067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074341#comment-16074341
 ] 

ASF GitHub Bot commented on FLINK-7067:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4254#discussion_r125570975
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 ---
    @@ -940,6 +955,177 @@ public void testCancelWithSavepoint() throws 
Exception {
        }
     
        /**
    +    * Tests that a failed cancel-job-with-savepoint request does not 
accidentally disable
    +    * periodic checkpoints.
    +    */
    +   @Test
    +   public void testCancelJobWithSavepointFailurePeriodicCheckpoints() 
throws Exception {
    +           testCancelJobWithSavepointFailure(true);
    +   }
    +
    +   /**
    +    * Tests that a failed cancel-job-with-savepoint request does not 
accidentally enable
    +    * periodic checkpoints.
    +    */
    +   @Test
    +   public void testCancelJobWithSavepointFailureNoPeriodicCheckpoints() 
throws Exception {
    +           testCancelJobWithSavepointFailure(false);
    +   }
    +
    +   /**
    +    * Tests that a failed savepoint does not cancel the job and that there 
are no
    +    * unintended side effects.
    +    *
    +    * @param enablePeriodicCheckpoints Flag to indicate whether to enable 
periodic checkpoints. We
    +    * need to test both here in order to verify that we don't accidentally 
disable or enable
    +    * checkpoints after a failed cancel-job-with-savepoint request.
    +    */
    +   private void testCancelJobWithSavepointFailure(
    +           boolean enablePeriodicCheckpoints) throws Exception {
    +
    +           long checkpointInterval = enablePeriodicCheckpoints ? 3600000 : 
Long.MAX_VALUE;
    +
    +           // Savepoint target
    +           File savepointTarget = tmpFolder.newFolder();
    +           savepointTarget.deleteOnExit();
    +
    +           // Timeout for Akka messages
    +           FiniteDuration askTimeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
    +
    +           // A source that declines savepoints, simulating the behaviour
    +           // of a failed savepoint.
    +           JobVertex sourceVertex = new JobVertex("Source");
    +           
sourceVertex.setInvokableClass(FailOnSavepointStatefulTask.class);
    +           sourceVertex.setParallelism(1);
    +           JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
    +
    +           final ActorSystem actorSystem = 
AkkaUtils.createLocalActorSystem(new Configuration());
    +
    +           try {
    +                   Tuple2<ActorRef, ActorRef> master = 
JobManager.startJobManagerActors(
    +                           new Configuration(),
    +                           actorSystem,
    +                           TestingUtils.defaultExecutor(),
    +                           TestingUtils.defaultExecutor(),
    +                           highAvailabilityServices,
    +                           Option.apply("jm"),
    +                           Option.apply("arch"),
    +                           TestingJobManager.class,
    +                           TestingMemoryArchivist.class);
    +
    +                   UUID leaderId = 
LeaderRetrievalUtils.retrieveLeaderSessionId(
    +                           
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
    +                           TestingUtils.TESTING_TIMEOUT());
    +
    +                   ActorGateway jobManager = new 
AkkaActorGateway(master._1(), leaderId);
    +
    +                   ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
    +                           new Configuration(),
    +                           ResourceID.generate(),
    +                           actorSystem,
    +                           highAvailabilityServices,
    +                           "localhost",
    +                           Option.apply("tm"),
    +                           true,
    +                           TestingTaskManager.class);
    +
    +                   ActorGateway taskManager = new 
AkkaActorGateway(taskManagerRef, leaderId);
    +
    +                   // Wait until connected
    +                   Object msg = new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
    +                   Await.ready(taskManager.ask(msg, askTimeout), 
askTimeout);
    +
    +                   JobCheckpointingSettings snapshottingSettings = new 
JobCheckpointingSettings(
    +                           Collections.singletonList(sourceVertex.getID()),
    +                           Collections.singletonList(sourceVertex.getID()),
    +                           Collections.singletonList(sourceVertex.getID()),
    +                           checkpointInterval,
    +                           3600000,
    +                           0,
    +                           Integer.MAX_VALUE,
    +                           ExternalizedCheckpointSettings.none(),
    +                           null,
    +                           true);
    +
    +                   jobGraph.setSnapshotSettings(snapshottingSettings);
    +
    +                   // Submit job graph
    +                   msg = new JobManagerMessages.SubmitJob(jobGraph, 
ListeningBehaviour.DETACHED);
    +                   Await.ready(jobManager.ask(msg, askTimeout), 
askTimeout);
    +
    +                   // Wait for all tasks to be running
    +                   msg = new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
    +                   Await.ready(jobManager.ask(msg, askTimeout), 
askTimeout);
    +
    +                   // 
----------------------------------------------------------------
    +                   // Super ugly... sorry! But this is one of the few bad 
options
    +                   // to test this here. Ideally, we would have a factory 
that we
    +                   // can set in tests as desired. But we don't. So here 
we go...
    +                   msg = new RequestExecutionGraph(jobGraph.getJobID());
    +                   Object result = Await.result(jobManager.ask(msg, 
askTimeout), askTimeout);
    +
    +                   ExecutionGraph eg;
    +                   if (result instanceof ExecutionGraphFound) {
    +                           // Sorry...
    +                           eg = (ExecutionGraph) ((ExecutionGraphFound) 
result).executionGraph();
    +                   } else {
    +                           throw new RuntimeException("Could not access 
ExecutionGraph for job with "
    +                                   + "ID " + jobGraph.getJobID() + ". 
Response: " + result.toString());
    +
    +                   }
    +
    +                   Field field = 
eg.getClass().getDeclaredField("checkpointCoordinator");
    +                   field.setAccessible(true);
    +                   CheckpointCoordinator coord = (CheckpointCoordinator) 
field.get(eg);
    +                   CheckpointCoordinator spiedCoord = Mockito.spy(coord);
    +                   field.set(eg, spiedCoord);
    +                   // 
----------------------------------------------------------------
    +
    +                   // Cancel with savepoint
    +                   msg = new 
JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(),
    +                           savepointTarget.getAbsolutePath());
    +                   CancellationResponse cancelResp = 
(CancellationResponse) Await.result(
    +                           jobManager.ask(msg, askTimeout), askTimeout);
    +
    +                   if (cancelResp instanceof CancellationFailure) {
    +                           if (enablePeriodicCheckpoints) {
    +                                   // Verify checkpoint scheduler 
deactivated and reactivated.
    +                                   // A call to start checkpoint scheduler 
calls stop scheduler
    +                                   // again. Therefore, we verify two 
calls for stop. Since we
    +                                   // spy (I know...) on the coordinator 
after the job has
    +                                   // started, we don't count calls before 
spying.
    +                                   verify(spiedCoord, 
times(1)).startCheckpointScheduler();
    --- End diff --
    
    Could we not re-attempt a cancel-with-savepoint? If the coordinator is 
shutdown it will fail; if it was restarted it should succeed (provided we 
adjust the failing source to only fail the first time). Then we wouldn't need 
the spying but would actually just test observable behavior.


> Cancel with savepoint does not restart checkpoint scheduler on failure
> ----------------------------------------------------------------------
>
>                 Key: FLINK-7067
>                 URL: https://issues.apache.org/jira/browse/FLINK-7067
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.1
>            Reporter: Ufuk Celebi
>
> The `CancelWithSavepoint` action of the JobManager first stops the checkpoint 
> scheduler, then triggers a savepoint, and cancels the job after the savepoint 
> completes.
> If the savepoint fails, the command should not have any side effects and we 
> don't cancel the job. The issue is that the checkpoint scheduler is not 
> restarted though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to