Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4254#discussion_r125606557
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 ---
    @@ -940,6 +955,177 @@ public void testCancelWithSavepoint() throws 
Exception {
        }
     
        /**
    +    * Tests that a failed cancel-job-with-savepoint request does not 
accidentally disable
    +    * periodic checkpoints.
    +    */
    +   @Test
    +   public void testCancelJobWithSavepointFailurePeriodicCheckpoints() 
throws Exception {
    +           testCancelJobWithSavepointFailure(true);
    +   }
    +
    +   /**
    +    * Tests that a failed cancel-job-with-savepoint request does not 
accidentally enable
    +    * periodic checkpoints.
    +    */
    +   @Test
    +   public void testCancelJobWithSavepointFailureNoPeriodicCheckpoints() 
throws Exception {
    +           testCancelJobWithSavepointFailure(false);
    +   }
    +
    +   /**
    +    * Tests that a failed savepoint does not cancel the job and that there 
are no
    +    * unintended side effects.
    +    *
    +    * @param enablePeriodicCheckpoints Flag to indicate whether to enable 
periodic checkpoints. We
    +    * need to test both here in order to verify that we don't accidentally 
disable or enable
    +    * checkpoints after a failed cancel-job-with-savepoint request.
    +    */
    +   private void testCancelJobWithSavepointFailure(
    +           boolean enablePeriodicCheckpoints) throws Exception {
    +
    +           long checkpointInterval = enablePeriodicCheckpoints ? 3600000 : 
Long.MAX_VALUE;
    +
    +           // Savepoint target
    +           File savepointTarget = tmpFolder.newFolder();
    +           savepointTarget.deleteOnExit();
    +
    +           // Timeout for Akka messages
    +           FiniteDuration askTimeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
    +
    +           // A source that declines savepoints, simulating the behaviour
    +           // of a failed savepoint.
    +           JobVertex sourceVertex = new JobVertex("Source");
    +           
sourceVertex.setInvokableClass(FailOnSavepointStatefulTask.class);
    +           sourceVertex.setParallelism(1);
    +           JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
    +
    +           final ActorSystem actorSystem = 
AkkaUtils.createLocalActorSystem(new Configuration());
    +
    +           try {
    +                   Tuple2<ActorRef, ActorRef> master = 
JobManager.startJobManagerActors(
    +                           new Configuration(),
    +                           actorSystem,
    +                           TestingUtils.defaultExecutor(),
    +                           TestingUtils.defaultExecutor(),
    +                           highAvailabilityServices,
    +                           Option.apply("jm"),
    +                           Option.apply("arch"),
    +                           TestingJobManager.class,
    +                           TestingMemoryArchivist.class);
    +
    +                   UUID leaderId = 
LeaderRetrievalUtils.retrieveLeaderSessionId(
    +                           
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
    +                           TestingUtils.TESTING_TIMEOUT());
    +
    +                   ActorGateway jobManager = new 
AkkaActorGateway(master._1(), leaderId);
    +
    +                   ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
    +                           new Configuration(),
    +                           ResourceID.generate(),
    +                           actorSystem,
    +                           highAvailabilityServices,
    +                           "localhost",
    +                           Option.apply("tm"),
    +                           true,
    +                           TestingTaskManager.class);
    +
    +                   ActorGateway taskManager = new 
AkkaActorGateway(taskManagerRef, leaderId);
    +
    +                   // Wait until connected
    +                   Object msg = new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
    +                   Await.ready(taskManager.ask(msg, askTimeout), 
askTimeout);
    +
    +                   JobCheckpointingSettings snapshottingSettings = new 
JobCheckpointingSettings(
    +                           Collections.singletonList(sourceVertex.getID()),
    +                           Collections.singletonList(sourceVertex.getID()),
    +                           Collections.singletonList(sourceVertex.getID()),
    +                           checkpointInterval,
    +                           3600000,
    +                           0,
    +                           Integer.MAX_VALUE,
    +                           ExternalizedCheckpointSettings.none(),
    +                           null,
    +                           true);
    +
    +                   jobGraph.setSnapshotSettings(snapshottingSettings);
    +
    +                   // Submit job graph
    +                   msg = new JobManagerMessages.SubmitJob(jobGraph, 
ListeningBehaviour.DETACHED);
    +                   Await.ready(jobManager.ask(msg, askTimeout), 
askTimeout);
    +
    +                   // Wait for all tasks to be running
    +                   msg = new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
    +                   Await.ready(jobManager.ask(msg, askTimeout), 
askTimeout);
    +
    +                   // 
----------------------------------------------------------------
    +                   // Super ugly... sorry! But this is one of the few bad 
options
    +                   // to test this here. Ideally, we would have a factory 
that we
    +                   // can set in tests as desired. But we don't. So here 
we go...
    +                   msg = new RequestExecutionGraph(jobGraph.getJobID());
    +                   Object result = Await.result(jobManager.ask(msg, 
askTimeout), askTimeout);
    +
    +                   ExecutionGraph eg;
    +                   if (result instanceof ExecutionGraphFound) {
    +                           // Sorry...
    +                           eg = (ExecutionGraph) ((ExecutionGraphFound) 
result).executionGraph();
    +                   } else {
    +                           throw new RuntimeException("Could not access 
ExecutionGraph for job with "
    +                                   + "ID " + jobGraph.getJobID() + ". 
Response: " + result.toString());
    +
    +                   }
    +
    +                   Field field = 
eg.getClass().getDeclaredField("checkpointCoordinator");
    +                   field.setAccessible(true);
    +                   CheckpointCoordinator coord = (CheckpointCoordinator) 
field.get(eg);
    +                   CheckpointCoordinator spiedCoord = Mockito.spy(coord);
    +                   field.set(eg, spiedCoord);
    +                   // 
----------------------------------------------------------------
    +
    +                   // Cancel with savepoint
    +                   msg = new 
JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(),
    +                           savepointTarget.getAbsolutePath());
    +                   CancellationResponse cancelResp = 
(CancellationResponse) Await.result(
    +                           jobManager.ask(msg, askTimeout), askTimeout);
    +
    +                   if (cancelResp instanceof CancellationFailure) {
    +                           if (enablePeriodicCheckpoints) {
    +                                   // Verify checkpoint scheduler 
deactivated and reactivated.
    +                                   // A call to start checkpoint scheduler 
calls stop scheduler
    +                                   // again. Therefore, we verify two 
calls for stop. Since we
    +                                   // spy (I know...) on the coordinator 
after the job has
    +                                   // started, we don't count calls before 
spying.
    +                                   verify(spiedCoord, 
times(1)).startCheckpointScheduler();
    --- End diff --
    
    The thing is that the stopping of the scheduler is part of the expected 
behaviour of cancel-with-job-savepoint, because we don't want any checkpoints 
between the savepoint and cancel job 
(https://issues.apache.org/jira/browse/FLINK-4717). I think for that we do need 
the spying :-( It was simply not fully tested before... Does this make sense or 
am I missing your point?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to