ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages URL: https://github.com/apache/flink/pull/10332#discussion_r368273690
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java ########## @@ -283,31 +300,494 @@ public void testStopPeriodicScheduler() throws Exception { failureManager); // Periodic + final CompletableFuture<CompletedCheckpoint> onCompletionPromise1 = coord.triggerCheckpoint( + System.currentTimeMillis(), + CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + null, + true, + false); + manuallyTriggeredScheduledExecutor.triggerAll(); try { - coord.triggerCheckpoint( - System.currentTimeMillis(), - CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - null, - true, - false); - manuallyTriggeredScheduledExecutor.triggerAll(); + onCompletionPromise1.get(); fail("The triggerCheckpoint call expected an exception"); - } catch (CheckpointException e) { - assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, e.getCheckpointFailureReason()); + } catch (ExecutionException e) { + final Optional<CheckpointException> checkpointExceptionOptional = + ExceptionUtils.findThrowable(e, CheckpointException.class); + assertTrue(checkpointExceptionOptional.isPresent()); + assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, + checkpointExceptionOptional.get().getCheckpointFailureReason()); } // Not periodic + final CompletableFuture<CompletedCheckpoint> onCompletionPromise2 = coord.triggerCheckpoint( + System.currentTimeMillis(), + CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + null, + false, + false); + manuallyTriggeredScheduledExecutor.triggerAll(); + assertFalse(onCompletionPromise2.isCompletedExceptionally()); + } + + @Test + public void testTriggerCheckpointWithShuttingDownCoordinator() throws Exception { + // create some mock Execution vertices that receive the checkpoint trigger messages + final ExecutionAttemptID attemptID1 = new ExecutionAttemptID(); + ExecutionVertex vertex1 = mockExecutionVertex(attemptID1); + + // set up the coordinator and validate the initial state + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( + 600000, + 600000, + 0, + Integer.MAX_VALUE, + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + new JobID(), + chkConfig, + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + new MemoryStateBackend(), + Executors.directExecutor(), + manuallyTriggeredScheduledExecutor, + SharedStateRegistry.DEFAULT_FACTORY, + failureManager); + + coord.startCheckpointScheduler(); + // Periodic + final CompletableFuture<CompletedCheckpoint> onCompletionPromise = coord.triggerCheckpoint( + System.currentTimeMillis(), + CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + null, + true, + false); + + coord.shutdown(JobStatus.FAILED); + manuallyTriggeredScheduledExecutor.triggerAll(); try { - coord.triggerCheckpoint( - System.currentTimeMillis(), - CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - null, - false, - false); - manuallyTriggeredScheduledExecutor.triggerAll(); - } catch (CheckpointException e) { - fail("Unexpected exception : " + e.getCheckpointFailureReason().message()); + onCompletionPromise.get(); + fail("Should not reach here"); + } catch (ExecutionException e) { + final Optional<CheckpointException> checkpointExceptionOptional = + ExceptionUtils.findThrowable(e, CheckpointException.class); + assertTrue(checkpointExceptionOptional.isPresent()); + assertEquals(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN, + checkpointExceptionOptional.get().getCheckpointFailureReason()); } } + @Test + public void testTriggerCheckpointRequestQueued() throws Exception { + // create some mock Execution vertices that receive the checkpoint trigger messages + final ExecutionAttemptID attemptID1 = new ExecutionAttemptID(); + ExecutionVertex vertex1 = mockExecutionVertex(attemptID1); + + // set up the coordinator and validate the initial state + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( + 600000, + 600000, + 0, + Integer.MAX_VALUE, + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + new JobID(), + chkConfig, + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + new MemoryStateBackend(), + Executors.directExecutor(), + manuallyTriggeredScheduledExecutor, + SharedStateRegistry.DEFAULT_FACTORY, + failureManager); + + coord.startCheckpointScheduler(); + // Periodic + final CompletableFuture<CompletedCheckpoint> onCompletionPromise1 = coord.triggerCheckpoint( + System.currentTimeMillis(), + CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + null, + true, + false); + assertTrue(coord.isTriggering()); + assertEquals(0, coord.getTriggerRequestQueue().size()); + + // another trigger before the prior one finished + manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks(); + assertTrue(coord.isTriggering()); + assertEquals(1, coord.getTriggerRequestQueue().size()); + + manuallyTriggeredScheduledExecutor.triggerAll(); + assertFalse(onCompletionPromise1.isCompletedExceptionally()); + assertFalse(coord.isTriggering()); + assertEquals(0, coord.getTriggerRequestQueue().size()); + } + + @Test + public void testTriggerCheckpointRequestQueuedWithFailure() throws Exception { + // create some mock Execution vertices that receive the checkpoint trigger messages + final ExecutionAttemptID attemptID1 = new ExecutionAttemptID(); + final AtomicInteger taskManagerCheckpointTriggeredTimes = new AtomicInteger(0); + final SimpleAckingTaskManagerGateway.CheckpointConsumer checkpointConsumer = + (executionAttemptID, + jobId, checkpointId, + timestamp, + checkpointOptions, + advanceToEndOfEventTime) -> taskManagerCheckpointTriggeredTimes.incrementAndGet(); + ExecutionVertex vertex1 = mockExecutionVertex(attemptID1, checkpointConsumer); + + // set up the coordinator and validate the initial state + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( + 600000, + 600000, + 0, + Integer.MAX_VALUE, + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + new JobID(), + chkConfig, + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new TestingCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + new MemoryStateBackend(), + Executors.directExecutor(), + manuallyTriggeredScheduledExecutor, + SharedStateRegistry.DEFAULT_FACTORY, + failureManager); + + coord.startCheckpointScheduler(); + // Periodic + final CompletableFuture<CompletedCheckpoint> onCompletionPromise1 = coord.triggerCheckpoint( Review comment: I think `onCompletionPromise1` is better here. It's not important whether it's periodic or not. The comment should be clearer. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services