[
https://issues.apache.org/jira/browse/FLINK-7067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212449#comment-16212449
]
ASF GitHub Bot commented on FLINK-7067:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/4254#discussion_r145923787
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
---
@@ -940,6 +955,177 @@ public void testCancelWithSavepoint() throws
Exception {
}
/**
+ * Tests that a failed cancel-job-with-savepoint request does not
accidentally disable
+ * periodic checkpoints.
+ */
+ @Test
+ public void testCancelJobWithSavepointFailurePeriodicCheckpoints()
throws Exception {
+ testCancelJobWithSavepointFailure(true);
+ }
+
+ /**
+ * Tests that a failed cancel-job-with-savepoint request does not
accidentally enable
+ * periodic checkpoints.
+ */
+ @Test
+ public void testCancelJobWithSavepointFailureNoPeriodicCheckpoints()
throws Exception {
+ testCancelJobWithSavepointFailure(false);
+ }
+
+ /**
+ * Tests that a failed savepoint does not cancel the job and that there
are no
+ * unintended side effects.
+ *
+ * @param enablePeriodicCheckpoints Flag to indicate whether to enable
periodic checkpoints. We
+ * need to test both here in order to verify that we don't accidentally
disable or enable
+ * checkpoints after a failed cancel-job-with-savepoint request.
+ */
+ private void testCancelJobWithSavepointFailure(
+ boolean enablePeriodicCheckpoints) throws Exception {
+
+ long checkpointInterval = enablePeriodicCheckpoints ? 3600000 :
Long.MAX_VALUE;
+
+ // Savepoint target
+ File savepointTarget = tmpFolder.newFolder();
+ savepointTarget.deleteOnExit();
+
+ // Timeout for Akka messages
+ FiniteDuration askTimeout = new FiniteDuration(30,
TimeUnit.SECONDS);
+
+ // A source that declines savepoints, simulating the behaviour
+ // of a failed savepoint.
+ JobVertex sourceVertex = new JobVertex("Source");
+
sourceVertex.setInvokableClass(FailOnSavepointStatefulTask.class);
+ sourceVertex.setParallelism(1);
+ JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
+
+ final ActorSystem actorSystem =
AkkaUtils.createLocalActorSystem(new Configuration());
+
+ try {
+ Tuple2<ActorRef, ActorRef> master =
JobManager.startJobManagerActors(
+ new Configuration(),
+ actorSystem,
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ highAvailabilityServices,
+ Option.apply("jm"),
+ Option.apply("arch"),
+ TestingJobManager.class,
+ TestingMemoryArchivist.class);
+
+ UUID leaderId =
LeaderRetrievalUtils.retrieveLeaderSessionId(
+
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+ TestingUtils.TESTING_TIMEOUT());
+
+ ActorGateway jobManager = new
AkkaActorGateway(master._1(), leaderId);
+
+ ActorRef taskManagerRef =
TaskManager.startTaskManagerComponentsAndActor(
+ new Configuration(),
+ ResourceID.generate(),
+ actorSystem,
+ highAvailabilityServices,
+ "localhost",
+ Option.apply("tm"),
+ true,
+ TestingTaskManager.class);
+
+ ActorGateway taskManager = new
AkkaActorGateway(taskManagerRef, leaderId);
--- End diff --
Definitely +1
> Cancel with savepoint does not restart checkpoint scheduler on failure
> ----------------------------------------------------------------------
>
> Key: FLINK-7067
> URL: https://issues.apache.org/jira/browse/FLINK-7067
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Affects Versions: 1.3.1
> Reporter: Ufuk Celebi
> Assignee: Ufuk Celebi
> Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> The `CancelWithSavepoint` action of the JobManager first stops the checkpoint
> scheduler, then triggers a savepoint, and cancels the job after the savepoint
> completes.
> If the savepoint fails, the command should not have any side effects and we
> don't cancel the job. The issue is that the checkpoint scheduler is not
> restarted though.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)