[ 
https://issues.apache.org/jira/browse/FLINK-7067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212374#comment-16212374
 ] 

ASF GitHub Bot commented on FLINK-7067:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4254#discussion_r145907545
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 ---
    @@ -940,6 +955,177 @@ public void testCancelWithSavepoint() throws 
Exception {
        }
     
        /**
    +    * Tests that a failed cancel-job-with-savepoint request does not 
accidentally disable
    +    * periodic checkpoints.
    +    */
    +   @Test
    +   public void testCancelJobWithSavepointFailurePeriodicCheckpoints() 
throws Exception {
    +           testCancelJobWithSavepointFailure(true);
    +   }
    +
    +   /**
    +    * Tests that a failed cancel-job-with-savepoint request does not 
accidentally enable
    +    * periodic checkpoints.
    +    */
    +   @Test
    +   public void testCancelJobWithSavepointFailureNoPeriodicCheckpoints() 
throws Exception {
    +           testCancelJobWithSavepointFailure(false);
    +   }
    +
    +   /**
    +    * Tests that a failed savepoint does not cancel the job and that there 
are no
    +    * unintended side effects.
    +    *
    +    * @param enablePeriodicCheckpoints Flag to indicate whether to enable 
periodic checkpoints. We
    +    * need to test both here in order to verify that we don't accidentally 
disable or enable
    +    * checkpoints after a failed cancel-job-with-savepoint request.
    +    */
    +   private void testCancelJobWithSavepointFailure(
    +           boolean enablePeriodicCheckpoints) throws Exception {
    +
    +           long checkpointInterval = enablePeriodicCheckpoints ? 3600000 : 
Long.MAX_VALUE;
    +
    +           // Savepoint target
    +           File savepointTarget = tmpFolder.newFolder();
    +           savepointTarget.deleteOnExit();
    +
    +           // Timeout for Akka messages
    +           FiniteDuration askTimeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
    +
    +           // A source that declines savepoints, simulating the behaviour
    +           // of a failed savepoint.
    +           JobVertex sourceVertex = new JobVertex("Source");
    +           
sourceVertex.setInvokableClass(FailOnSavepointStatefulTask.class);
    +           sourceVertex.setParallelism(1);
    +           JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
    +
    +           final ActorSystem actorSystem = 
AkkaUtils.createLocalActorSystem(new Configuration());
    +
    +           try {
    +                   Tuple2<ActorRef, ActorRef> master = 
JobManager.startJobManagerActors(
    +                           new Configuration(),
    +                           actorSystem,
    +                           TestingUtils.defaultExecutor(),
    +                           TestingUtils.defaultExecutor(),
    +                           highAvailabilityServices,
    +                           Option.apply("jm"),
    +                           Option.apply("arch"),
    +                           TestingJobManager.class,
    +                           TestingMemoryArchivist.class);
    +
    +                   UUID leaderId = 
LeaderRetrievalUtils.retrieveLeaderSessionId(
    +                           
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
    +                           TestingUtils.TESTING_TIMEOUT());
    +
    +                   ActorGateway jobManager = new 
AkkaActorGateway(master._1(), leaderId);
    +
    +                   ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
    +                           new Configuration(),
    +                           ResourceID.generate(),
    +                           actorSystem,
    +                           highAvailabilityServices,
    +                           "localhost",
    +                           Option.apply("tm"),
    +                           true,
    +                           TestingTaskManager.class);
    +
    +                   ActorGateway taskManager = new 
AkkaActorGateway(taskManagerRef, leaderId);
    --- End diff --
    
    Can't we simply use a `TestingCluster` here for all the setup work?


> Cancel with savepoint does not restart checkpoint scheduler on failure
> ----------------------------------------------------------------------
>
>                 Key: FLINK-7067
>                 URL: https://issues.apache.org/jira/browse/FLINK-7067
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.1
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.3
>
>
> The `CancelWithSavepoint` action of the JobManager first stops the checkpoint 
> scheduler, then triggers a savepoint, and cancels the job after the savepoint 
> completes.
> If the savepoint fails, the command should not have any side effects and we 
> don't cancel the job. The issue is that the checkpoint scheduler is not 
> restarted though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to