tillrohrmann commented on a change in pull request #10111: [FLINK-13969]
Resuming Externalized Checkpoint (rocks, incremental, scale down) end-to-end
test fails on Travis
URL: https://github.com/apache/flink/pull/10111#discussion_r343596512
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2491,6 +2493,60 @@ public void failJobDueToTaskFailure(Throwable cause,
ExecutionAttemptID failingT
coordinator.shutdown(JobStatus.FAILING);
}
+ /**
+ * Tests that do not trigger checkpoint when stop the coordinator after
the eager pre-check.
+ */
+ @Test
+ public void testTriggerCheckpointAfterCancel() throws Exception {
+ ExecutionVertex vertex1 = mockExecutionVertex(new
ExecutionAttemptID());
+
+ // set up the coordinator
+ CheckpointCoordinatorConfiguration chkConfig = new
CheckpointCoordinatorConfiguration(
+ 600000,
+ 600000,
+ 0,
+ Integer.MAX_VALUE,
+
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+ true,
+ false,
+ 0);
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ new JobID(),
+ chkConfig,
+ new ExecutionVertex[]{vertex1},
+ new ExecutionVertex[]{vertex1},
+ new ExecutionVertex[]{vertex1},
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1),
+ new MemoryStateBackend(),
+ Executors.directExecutor(),
+ manuallyTriggeredScheduledExecutor,
+ SharedStateRegistry.DEFAULT_FACTORY,
+ failureManager);
+
+ // start the coordinator
+ coord.startCheckpointScheduler();
+
+ // inject the checkpointIdCounter, so that we have passed the
eager pre-checks
+ CheckpointIDCounter idCounter = spy((CheckpointIDCounter)
Whitebox.getInternalState(coord, "checkpointIdCounter"));
+ Whitebox.setInternalState(coord, "checkpointIdCounter",
idCounter);
+ doAnswer(new Answer() {
+ @Override
+ public Long answer(InvocationOnMock invocation) throws
Throwable {
+ // stop the coordinator here to mock that the
coordinator stopped after the eager pre-checks has been done.
+ coord.stopCheckpointScheduler();
+ return (Long) invocation.callRealMethod();
+ }
+ }).when(idCounter).getAndIncrement();
+ try {
+ coord.triggerCheckpoint(System.currentTimeMillis(),
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
null, true, false);
+ fail("should not trigger periodic checkpoint after stop
the coordinator.");
+ } catch (CheckpointException e) {
+
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN,
e.getCheckpointFailureReason());
+ }
Review comment:
Even better would be to simply provide a `TestingCheckpointIDCounter` which
blocks when calling `CheckpointIDCounter#getAndIncrement`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services