akalash commented on a change in pull request #16637: URL: https://github.com/apache/flink/pull/16637#discussion_r683558429
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -862,11 +862,19 @@ private void onTriggerSuccess() { */ private void onTriggerFailure( CheckpointTriggerRequest onCompletionPromise, Throwable throwable) { - final CheckpointException checkpointException = - getCheckpointException( - CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable); - onCompletionPromise.completeExceptionally(checkpointException); - onTriggerFailure((PendingCheckpoint) null, checkpointException); + Throwable rootThrowable = ExceptionUtils.stripCompletionException(throwable); Review comment: It is actually not a root throwable. stripCompletionException only unwraps CompletionException. But what if IOException would be hidden deeper than one level? According to the task description, I suppose we interesting in any IOException at any cause level. So if I understand everything right `ExceptionUtils.findThrowable(IOException).isPresent()` is more correct construction here. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ########## @@ -136,6 +135,8 @@ public void checkFailureCounter(CheckpointException exception, long checkpointId // ignore break; + case EXCEPTION: Review comment: What is the reason for moving EXCEPTION out of ignore? I don't see any clue of this either in the ticket or in the tests. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -862,11 +862,19 @@ private void onTriggerSuccess() { */ private void onTriggerFailure( CheckpointTriggerRequest onCompletionPromise, Throwable throwable) { - final CheckpointException checkpointException = - getCheckpointException( - CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable); - onCompletionPromise.completeExceptionally(checkpointException); - onTriggerFailure((PendingCheckpoint) null, checkpointException); + Throwable rootThrowable = ExceptionUtils.stripCompletionException(throwable); Review comment: Also, why do you use unwrapped throwable only if it is IOException but if it is not you use the initial one? I think logic should be consistent here. If we know that this throwable can be wrapped by CompletionException we should unwrap it in both cases(Perhaps, it makes sense to move unwrapping into `getCheckpointException`). ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java ########## @@ -96,7 +96,7 @@ public void testTotalCountValue() { } // CHECKPOINT_DECLINED, CHECKPOINT_EXPIRED and CHECKPOINT_ASYNC_EXCEPTION - assertEquals(3, callback.getInvokeCounter()); + assertEquals(5, callback.getInvokeCounter()); Review comment: Inconsistency between number and comment. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -862,11 +862,19 @@ private void onTriggerSuccess() { */ private void onTriggerFailure( CheckpointTriggerRequest onCompletionPromise, Throwable throwable) { - final CheckpointException checkpointException = - getCheckpointException( - CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable); - onCompletionPromise.completeExceptionally(checkpointException); - onTriggerFailure((PendingCheckpoint) null, checkpointException); + Throwable rootThrowable = ExceptionUtils.stripCompletionException(throwable); + if (rootThrowable instanceof IOException) { + final CheckpointException checkpointException = + getCheckpointException(CheckpointFailureReason.IO_EXCEPTION, rootThrowable); + onCompletionPromise.completeExceptionally(checkpointException); + onTriggerFailure((PendingCheckpoint) null, throwable); + } else { + final CheckpointException checkpointException = + getCheckpointException( + CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable); + onCompletionPromise.completeExceptionally(checkpointException); + onTriggerFailure((PendingCheckpoint) null, checkpointException); Review comment: I didn't get why `if` and `else` blocks are different(one passes throwable to onTriggerFailure while another passes checkpointException). As I understand you should only choose CheckpointFailureReason but another code should be the same. Or Did I understand something wrong? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java ########## @@ -81,6 +81,8 @@ FINALIZE_CHECKPOINT_FAILURE(false, "Failure to finalize checkpoint."), + IO_EXCEPTION(false, "Trigger checkpoint failure."), Review comment: Please fix the description. right now it is just copy-paste of `TRIGGER_CHECKPOINT_FAILURE`. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ########## @@ -624,6 +625,36 @@ public void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() } } + @Test + public void testIOExceptionCheckpointExceedsTolerableFailureNumber() throws Exception { Review comment: This test checks that the task is failed when it receives the new reason type but I don't see tests that check that your new code inside of `onTriggerFailure` and `onTriggerFailure` work. As I can see, at least `testPeriodicSchedulingWithInactiveTasks`, `testTriggerCheckpointAfterCancel` and `testCheckpointAbortsIfTriggerTasksAreFinished`(and I sure many more) trigger somehow the exception. Perhaps, you can take a look how these tests works and you can write something similar. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -892,9 +900,12 @@ private void onTriggerFailure(@Nullable PendingCheckpoint checkpoint, Throwable job, numUnsuccessful, throwable); - final CheckpointException cause = - getCheckpointException( - CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable); + CheckpointFailureReason defaultReason = + CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE; + if (throwable instanceof IOException) { Review comment: The same here. Why are you so sure that IOException would not be wrapped by other exceptions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org