pnowojski commented on a change in pull request #12670:
URL: https://github.com/apache/flink/pull/12670#discussion_r441280357



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -538,51 +542,61 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
                                                                        
coordinatorsToCheckpoint, pendingCheckpoint, timer),
                                                        timer);
 
-                       FutureUtils.assertNoException(
-                               CompletableFuture.allOf(masterStatesComplete, 
coordinatorCheckpointsComplete)
-                                       .handleAsync(
-                                               (ignored, throwable) -> {
-                                                       final PendingCheckpoint 
checkpoint =
-                                                               
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
-
-                                                       
Preconditions.checkState(
-                                                               checkpoint != 
null || throwable != null,
-                                                               "Either the 
pending checkpoint needs to be created or an error must have been occurred.");
-
-                                                       if (throwable != null) {
-                                                               // the 
initialization might not be finished yet
-                                                               if (checkpoint 
== null) {
-                                                                       
onTriggerFailure(request, throwable);
-                                                               } else {
-                                                                       
onTriggerFailure(checkpoint, throwable);
-                                                               }
+                       FutureUtils.waitForAll(asList(masterStatesComplete, 
coordinatorCheckpointsComplete))
+                               .handleAsync(
+                                       (ignored, throwable) -> {
+                                               final PendingCheckpoint 
checkpoint =
+                                                       
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
+
+                                               Preconditions.checkState(
+                                                       checkpoint != null || 
throwable != null,
+                                                       "Either the pending 
checkpoint needs to be created or an error must have been occurred.");
+
+                                               if (throwable != null) {
+                                                       // the initialization 
might not be finished yet
+                                                       if (checkpoint == null) 
{
+                                                               
onTriggerFailure(request, throwable);
                                                        } else {
-                                                               if 
(checkpoint.isDiscarded()) {
-                                                                       
onTriggerFailure(
-                                                                               
checkpoint,
-                                                                               
new CheckpointException(
-                                                                               
        CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
-                                                                               
        checkpoint.getFailureCause()));
-                                                               } else {
-                                                                       // no 
exception, no discarding, everything is OK
-                                                                       final 
long checkpointId = checkpoint.getCheckpointId();
-                                                                       
snapshotTaskState(
-                                                                               
timestamp,
-                                                                               
checkpointId,
-                                                                               
checkpoint.getCheckpointStorageLocation(),
-                                                                               
request.props,
-                                                                               
executions,
-                                                                               
request.advanceToEndOfTime);
-
-                                                                       
coordinatorsToCheckpoint.forEach((ctx) -> 
ctx.afterSourceBarrierInjection(checkpointId));
-
-                                                                       
onTriggerSuccess();
-                                                               }
+                                                               
onTriggerFailure(checkpoint, throwable);
                                                        }
+                                               } else {
+                                                       if 
(checkpoint.isDiscarded()) {
+                                                               
onTriggerFailure(
+                                                                       
checkpoint,
+                                                                       new 
CheckpointException(
+                                                                               
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+                                                                               
checkpoint.getFailureCause()));
+                                                       } else {
+                                                               // no 
exception, no discarding, everything is OK
+                                                               final long 
checkpointId = checkpoint.getCheckpointId();
+                                                               
snapshotTaskState(
+                                                                       
timestamp,
+                                                                       
checkpointId,
+                                                                       
checkpoint.getCheckpointStorageLocation(),
+                                                                       
request.props,
+                                                                       
executions,
+                                                                       
request.advanceToEndOfTime);
+
+                                                               
coordinatorsToCheckpoint.forEach((ctx) -> 
ctx.afterSourceBarrierInjection(checkpointId));
+
+                                                               
onTriggerSuccess();
+                                                       }
+                                               }
 
-                                                       return null;
-                                               },
-                                               timer));
+                                               return null;
+                                       },
+                                       timer)
+                               .whenComplete((unused, error) -> {
+                                       if (error != null) {
+                                               if (!isShutdown()) {
+                                                       
failureManager.handleJobLevelCheckpointException(new 
CheckpointException(EXCEPTION, error), Optional.empty());

Review comment:
       I think doing `System.exit` is not nice practice in general. Flushing 
logs is one thing, but there might be some other exception handling code that 
wouldn't be execute as expected. Maybe some production code, like cleaning up 
temp files, maybe some debug code someone was counting on. Also it's confusing, 
as it's not the normal/usual way how Flink fails (with `System.exit` I guess 
there would be no message in the error log, but only on stderr?).
   
   In many other places we have `checkState`s for programming errors.
   
   If failing single job is not enough, we should fail whole JM, but via some 
exception. However I'm not buying the argument why should we fail other jobs as 
`CheckpointCoordinator` is bound to a single job. For example we are not 
failing whole TM if there was a `checkState` or `NPE` failure in the runtime 
code of a single job.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to