ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r368273690
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
 ##########
 @@ -283,31 +300,494 @@ public void testStopPeriodicScheduler() throws 
Exception {
                        failureManager);
 
                // Periodic
+               final CompletableFuture<CompletedCheckpoint> 
onCompletionPromise1 = coord.triggerCheckpoint(
+                       System.currentTimeMillis(),
+                       
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+                       null,
+                       true,
+                       false);
+               manuallyTriggeredScheduledExecutor.triggerAll();
                try {
-                       coord.triggerCheckpoint(
-                               System.currentTimeMillis(),
-                               
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                               null,
-                               true,
-                               false);
-                       manuallyTriggeredScheduledExecutor.triggerAll();
+                       onCompletionPromise1.get();
                        fail("The triggerCheckpoint call expected an 
exception");
-               } catch (CheckpointException e) {
-                       
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, 
e.getCheckpointFailureReason());
+               } catch (ExecutionException e) {
+                       final Optional<CheckpointException> 
checkpointExceptionOptional =
+                               ExceptionUtils.findThrowable(e, 
CheckpointException.class);
+                       assertTrue(checkpointExceptionOptional.isPresent());
+                       
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN,
+                               
checkpointExceptionOptional.get().getCheckpointFailureReason());
                }
 
                // Not periodic
+               final CompletableFuture<CompletedCheckpoint> 
onCompletionPromise2 = coord.triggerCheckpoint(
+                       System.currentTimeMillis(),
+                       
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+                       null,
+                       false,
+                       false);
+               manuallyTriggeredScheduledExecutor.triggerAll();
+               assertFalse(onCompletionPromise2.isCompletedExceptionally());
+       }
+
+       @Test
+       public void testTriggerCheckpointWithShuttingDownCoordinator() throws 
Exception {
+               // create some mock Execution vertices that receive the 
checkpoint trigger messages
+               final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+               ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+               // set up the coordinator and validate the initial state
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
+                       600000,
+                       600000,
+                       0,
+                       Integer.MAX_VALUE,
+                       
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       new JobID(),
+                       chkConfig,
+                       new ExecutionVertex[] { vertex1 },
+                       new ExecutionVertex[] { vertex1 },
+                       new ExecutionVertex[] { vertex1 },
+                       new StandaloneCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(1),
+                       new MemoryStateBackend(),
+                       Executors.directExecutor(),
+                       manuallyTriggeredScheduledExecutor,
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       failureManager);
+
+               coord.startCheckpointScheduler();
+               // Periodic
+               final CompletableFuture<CompletedCheckpoint> 
onCompletionPromise = coord.triggerCheckpoint(
+                       System.currentTimeMillis(),
+                       
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+                       null,
+                       true,
+                       false);
+
+               coord.shutdown(JobStatus.FAILED);
+               manuallyTriggeredScheduledExecutor.triggerAll();
                try {
-                       coord.triggerCheckpoint(
-                               System.currentTimeMillis(),
-                               
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                               null,
-                               false,
-                               false);
-                       manuallyTriggeredScheduledExecutor.triggerAll();
-               } catch (CheckpointException e) {
-                       fail("Unexpected exception : " + 
e.getCheckpointFailureReason().message());
+                       onCompletionPromise.get();
+                       fail("Should not reach here");
+               } catch (ExecutionException e) {
+                       final Optional<CheckpointException> 
checkpointExceptionOptional =
+                               ExceptionUtils.findThrowable(e, 
CheckpointException.class);
+                       assertTrue(checkpointExceptionOptional.isPresent());
+                       
assertEquals(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN,
+                               
checkpointExceptionOptional.get().getCheckpointFailureReason());
                }
        }
 
+       @Test
+       public void testTriggerCheckpointRequestQueued() throws Exception {
+               // create some mock Execution vertices that receive the 
checkpoint trigger messages
+               final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+               ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+               // set up the coordinator and validate the initial state
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
+                       600000,
+                       600000,
+                       0,
+                       Integer.MAX_VALUE,
+                       
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       new JobID(),
+                       chkConfig,
+                       new ExecutionVertex[] { vertex1 },
+                       new ExecutionVertex[] { vertex1 },
+                       new ExecutionVertex[] { vertex1 },
+                       new StandaloneCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(1),
+                       new MemoryStateBackend(),
+                       Executors.directExecutor(),
+                       manuallyTriggeredScheduledExecutor,
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       failureManager);
+
+               coord.startCheckpointScheduler();
+               // Periodic
+               final CompletableFuture<CompletedCheckpoint> 
onCompletionPromise1 = coord.triggerCheckpoint(
+                       System.currentTimeMillis(),
+                       
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+                       null,
+                       true,
+                       false);
+               assertTrue(coord.isTriggering());
+               assertEquals(0, coord.getTriggerRequestQueue().size());
+
+               // another trigger before the prior one finished
+               
manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
+               assertTrue(coord.isTriggering());
+               assertEquals(1, coord.getTriggerRequestQueue().size());
+
+               manuallyTriggeredScheduledExecutor.triggerAll();
+               assertFalse(onCompletionPromise1.isCompletedExceptionally());
+               assertFalse(coord.isTriggering());
+               assertEquals(0, coord.getTriggerRequestQueue().size());
+       }
+
+       @Test
+       public void testTriggerCheckpointRequestQueuedWithFailure() throws 
Exception {
+               // create some mock Execution vertices that receive the 
checkpoint trigger messages
+               final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+               final AtomicInteger taskManagerCheckpointTriggeredTimes = new 
AtomicInteger(0);
+               final SimpleAckingTaskManagerGateway.CheckpointConsumer 
checkpointConsumer =
+                       (executionAttemptID,
+                       jobId, checkpointId,
+                       timestamp,
+                       checkpointOptions,
+                       advanceToEndOfEventTime) -> 
taskManagerCheckpointTriggeredTimes.incrementAndGet();
+               ExecutionVertex vertex1 = mockExecutionVertex(attemptID1, 
checkpointConsumer);
+
+               // set up the coordinator and validate the initial state
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
+                       600000,
+                       600000,
+                       0,
+                       Integer.MAX_VALUE,
+                       
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       new JobID(),
+                       chkConfig,
+                       new ExecutionVertex[] { vertex1 },
+                       new ExecutionVertex[] { vertex1 },
+                       new ExecutionVertex[] { vertex1 },
+                       new TestingCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(1),
+                       new MemoryStateBackend(),
+                       Executors.directExecutor(),
+                       manuallyTriggeredScheduledExecutor,
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       failureManager);
+
+               coord.startCheckpointScheduler();
+               // Periodic
+               final CompletableFuture<CompletedCheckpoint> 
onCompletionPromise1 = coord.triggerCheckpoint(
 
 Review comment:
   I think `onCompletionPromise1` is better here. It's not important whether 
it's periodic or not. The comment should be clearer.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to