rkhachatryan commented on a change in pull request #12611:
URL: https://github.com/apache/flink/pull/12611#discussion_r438872294



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -538,36 +538,45 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
                                                                        
coordinatorsToCheckpoint, pendingCheckpoint, timer),
                                                        timer);
 
-                       CompletableFuture.allOf(masterStatesComplete, 
coordinatorCheckpointsComplete)
-                               .whenCompleteAsync(
-                                       (ignored, throwable) -> {
-                                               final PendingCheckpoint 
checkpoint =
-                                                       
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
-
-                                               if (throwable == null && 
checkpoint != null && !checkpoint.isDiscarded()) {
-                                                       // no exception, no 
discarding, everything is OK
-                                                       final long checkpointId 
= checkpoint.getCheckpointId();
-                                                       snapshotTaskState(
-                                                               timestamp,
-                                                               checkpointId,
-                                                               
checkpoint.getCheckpointStorageLocation(),
-                                                               request.props,
-                                                               executions,
-                                                               
request.advanceToEndOfTime);
-
-                                                       
coordinatorsToCheckpoint.forEach((ctx) -> 
ctx.afterSourceBarrierInjection(checkpointId));
-
-                                                       onTriggerSuccess();
-                                               } else {
-                                                               // the 
initialization might not be finished yet
-                                                               if (checkpoint 
== null) {
-                                                                       
onTriggerFailure(request, throwable);
+                       FutureUtils.assertNoException(
+                               CompletableFuture.allOf(masterStatesComplete, 
coordinatorCheckpointsComplete)
+                                       .whenCompleteAsync(
+                                               (ignored, throwable) -> {
+                                                       final PendingCheckpoint 
checkpoint =
+                                                               
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
+
+                                                       if (throwable == null 
&& checkpoint != null) {
+                                                               if 
(checkpoint.isDiscarded()) {
+                                                                       
onTriggerFailure(
+                                                                               
checkpoint,
+                                                                               
new CheckpointException(
+                                                                               
        CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,

Review comment:
       I think `CHECKPOINT_DECLINED` would be more appropriate here because at 
this point checkpoint was already triggered.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
##########
@@ -529,6 +537,48 @@ public void 
testTriggerCheckpointSnapshotMasterHookFailed() throws Exception {
                assertEquals(0, 
checkpointCoordinator.getTriggerRequestQueue().size());
        }
 
+       /**
+        * This test only fails eventually.
+        */
+       @Test
+       public void 
discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws 
Exception {
+               final ExecutionVertex executionVertex = mockExecutionVertex(new 
ExecutionAttemptID());
+
+               final ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+               final CheckpointCoordinator checkpointCoordinator = new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                       .setTasks(new ExecutionVertex[]{executionVertex})
+                       .setTimer(new 
ScheduledExecutorServiceAdapter(scheduledExecutorService))
+                       
.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder()
+                               .build())
+                       .build();
+
+               final CompletableFuture<String> masterHookCheckpointFuture = 
new CompletableFuture<>();
+               final OneShotLatch triggerCheckpointLatch = new OneShotLatch();
+               checkpointCoordinator.addMasterHook(new 
TestingMasterHook(masterHookCheckpointFuture, triggerCheckpointLatch));
+
+               try {
+                       checkpointCoordinator.triggerCheckpoint(false);
+                       final CompletableFuture<CompletedCheckpoint> 
secondCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
+
+                       triggerCheckpointLatch.await();
+                       masterHookCheckpointFuture.complete("Completed");
+
+                       // discard triggering checkpoint
+                       checkpointCoordinator.abortPendingCheckpoints(new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+                       try {
+                               // verify that the second checkpoint request 
will be executed and eventually times out
+                               secondCheckpoint.get();

Review comment:
       I guess I'm missing some changes because I see only one checkpoint in 
this test.
   
   I think there indeed should be a second checkpoint that is started after the 
previous one is aborted. We can simulate the original problem without a latch 
by completing `TestingMasterHook.checkpointFuture` after aborting the 
checkpoint.
   

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -538,36 +538,45 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
                                                                        
coordinatorsToCheckpoint, pendingCheckpoint, timer),
                                                        timer);
 
-                       CompletableFuture.allOf(masterStatesComplete, 
coordinatorCheckpointsComplete)
-                               .whenCompleteAsync(
-                                       (ignored, throwable) -> {
-                                               final PendingCheckpoint 
checkpoint =
-                                                       
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
-
-                                               if (throwable == null && 
checkpoint != null && !checkpoint.isDiscarded()) {
-                                                       // no exception, no 
discarding, everything is OK
-                                                       final long checkpointId 
= checkpoint.getCheckpointId();
-                                                       snapshotTaskState(
-                                                               timestamp,
-                                                               checkpointId,
-                                                               
checkpoint.getCheckpointStorageLocation(),
-                                                               request.props,
-                                                               executions,
-                                                               
request.advanceToEndOfTime);
-
-                                                       
coordinatorsToCheckpoint.forEach((ctx) -> 
ctx.afterSourceBarrierInjection(checkpointId));
-
-                                                       onTriggerSuccess();
-                                               } else {
-                                                               // the 
initialization might not be finished yet
-                                                               if (checkpoint 
== null) {
-                                                                       
onTriggerFailure(request, throwable);
+                       FutureUtils.assertNoException(

Review comment:
       Won't this kill JVM on _any_ exception regardless of whether it was 
handled below or not? 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -538,36 +538,45 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
                                                                        
coordinatorsToCheckpoint, pendingCheckpoint, timer),
                                                        timer);
 
-                       CompletableFuture.allOf(masterStatesComplete, 
coordinatorCheckpointsComplete)
-                               .whenCompleteAsync(
-                                       (ignored, throwable) -> {
-                                               final PendingCheckpoint 
checkpoint =
-                                                       
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
-
-                                               if (throwable == null && 
checkpoint != null && !checkpoint.isDiscarded()) {
-                                                       // no exception, no 
discarding, everything is OK
-                                                       final long checkpointId 
= checkpoint.getCheckpointId();
-                                                       snapshotTaskState(
-                                                               timestamp,
-                                                               checkpointId,
-                                                               
checkpoint.getCheckpointStorageLocation(),
-                                                               request.props,
-                                                               executions,
-                                                               
request.advanceToEndOfTime);
-
-                                                       
coordinatorsToCheckpoint.forEach((ctx) -> 
ctx.afterSourceBarrierInjection(checkpointId));
-
-                                                       onTriggerSuccess();
-                                               } else {
-                                                               // the 
initialization might not be finished yet
-                                                               if (checkpoint 
== null) {
-                                                                       
onTriggerFailure(request, throwable);
+                       FutureUtils.assertNoException(
+                               CompletableFuture.allOf(masterStatesComplete, 
coordinatorCheckpointsComplete)
+                                       .whenCompleteAsync(
+                                               (ignored, throwable) -> {
+                                                       final PendingCheckpoint 
checkpoint =
+                                                               
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
+
+                                                       if (throwable == null 
&& checkpoint != null) {
+                                                               if 
(checkpoint.isDiscarded()) {
+                                                                       
onTriggerFailure(
+                                                                               
checkpoint,
+                                                                               
new CheckpointException(
+                                                                               
        CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+                                                                               
        checkpoint.getFailureCause()));
                                                                } else {
-                                                                       
onTriggerFailure(checkpoint, throwable);
+                                                                       // no 
exception, no discarding, everything is OK
+                                                                       final 
long checkpointId = checkpoint.getCheckpointId();
+                                                                       
snapshotTaskState(
+                                                                               
timestamp,
+                                                                               
checkpointId,
+                                                                               
checkpoint.getCheckpointStorageLocation(),
+                                                                               
request.props,
+                                                                               
executions,
+                                                                               
request.advanceToEndOfTime);
+
+                                                                       
coordinatorsToCheckpoint.forEach((ctx) -> 
ctx.afterSourceBarrierInjection(checkpointId));
+
+                                                                       
onTriggerSuccess();
                                                                }
-                                               }
-                                       },
-                                       timer);
+                                                       } else {
+                                                                       // the 
initialization might not be finished yet
+                                                                       if 
(checkpoint == null) {
+                                                                               
onTriggerFailure(request, throwable);
+                                                                       } else {
+                                                                               
onTriggerFailure(checkpoint, throwable);

Review comment:
       nit: explicit `else if (throwable == null && checkpoint == null)` branch 
would be more clear to me personally. As it is now, it implicitly relies on 
error handling above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to