ifndef-SleePy commented on a change in pull request #12611:
URL: https://github.com/apache/flink/pull/12611#discussion_r439302459



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
##########
@@ -529,6 +537,48 @@ public void 
testTriggerCheckpointSnapshotMasterHookFailed() throws Exception {
                assertEquals(0, 
checkpointCoordinator.getTriggerRequestQueue().size());
        }
 
+       /**
+        * This test only fails eventually.
+        */
+       @Test
+       public void 
discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws 
Exception {
+               final ExecutionVertex executionVertex = mockExecutionVertex(new 
ExecutionAttemptID());
+
+               final ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+               final CheckpointCoordinator checkpointCoordinator = new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                       .setTasks(new ExecutionVertex[]{executionVertex})
+                       .setTimer(new 
ScheduledExecutorServiceAdapter(scheduledExecutorService))
+                       
.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder()
+                               .build())
+                       .build();
+
+               final CompletableFuture<String> masterHookCheckpointFuture = 
new CompletableFuture<>();
+               final OneShotLatch triggerCheckpointLatch = new OneShotLatch();
+               checkpointCoordinator.addMasterHook(new 
TestingMasterHook(masterHookCheckpointFuture, triggerCheckpointLatch));
+
+               try {
+                       checkpointCoordinator.triggerCheckpoint(false);
+                       final CompletableFuture<CompletedCheckpoint> 
secondCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
+
+                       triggerCheckpointLatch.await();
+                       masterHookCheckpointFuture.complete("Completed");
+
+                       // discard triggering checkpoint
+                       checkpointCoordinator.abortPendingCheckpoints(new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+                       try {
+                               // verify that the second checkpoint request 
will be executed and eventually times out
+                               secondCheckpoint.get();

Review comment:
       It couldn't fail on my side either (based on release-1.11). I think the 
reason is there is no asynchronous call completed after the pending checkpoint 
has been aborted.
   
   I would suggest to use `manuallyTriggeredScheduledExecutor` as timer thread 
and use a separate thread for IO executor. Because,
   1. It's closer to the real scenario and we can control the sequence of each 
invocation accurately. We can easily abort the pending checkpoint first and 
complete an IO asynchronous invocation to simulate the NPE scenario.
   2. In the later PRs, when main thread is introduced to replace timer thread 
(now it's reverted due to FLINK-16770, I'm preparing to get them back), the 
coordinator-wide lock would be removed, 
   `triggerCheckpoint` and `abortPendingCheckpoints` would be called only in 
main thread. At that time, we have to use `manuallyTriggeredScheduledExecutor` 
as main thread executor to simulate the lock-free and single-threaded threading 
model. If we do it here, we don't need change it then.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -538,36 +538,45 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
                                                                        
coordinatorsToCheckpoint, pendingCheckpoint, timer),
                                                        timer);
 
-                       CompletableFuture.allOf(masterStatesComplete, 
coordinatorCheckpointsComplete)
-                               .whenCompleteAsync(
-                                       (ignored, throwable) -> {
-                                               final PendingCheckpoint 
checkpoint =
-                                                       
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
-
-                                               if (throwable == null && 
checkpoint != null && !checkpoint.isDiscarded()) {
-                                                       // no exception, no 
discarding, everything is OK
-                                                       final long checkpointId 
= checkpoint.getCheckpointId();
-                                                       snapshotTaskState(
-                                                               timestamp,
-                                                               checkpointId,
-                                                               
checkpoint.getCheckpointStorageLocation(),
-                                                               request.props,
-                                                               executions,
-                                                               
request.advanceToEndOfTime);
-
-                                                       
coordinatorsToCheckpoint.forEach((ctx) -> 
ctx.afterSourceBarrierInjection(checkpointId));
-
-                                                       onTriggerSuccess();
-                                               } else {
-                                                               // the 
initialization might not be finished yet
-                                                               if (checkpoint 
== null) {
-                                                                       
onTriggerFailure(request, throwable);
+                       FutureUtils.assertNoException(

Review comment:
       I agree that `FutureUtils.assertNoException` is a bit rough here, 
however it would be more complex to fail the job through 
`CheckpointFailureManager`. Because we have to consider two more things,
   1. Does the checkpoint is properly initialized? `CheckpointFailureManager` 
requires a checkpoint id but we might not have one.
   2. `isTriggering` must be taken care of. Failing job does not reset this 
flag.
   
   So `FutureUtils.assertNoException` is acceptable for me. Technically 
speaking, it should never be reached unless there is a bug of our code or some 
other extreme corner cases, like OOM.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to