akalash commented on a change in pull request #16637:
URL: https://github.com/apache/flink/pull/16637#discussion_r683558429



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -862,11 +862,19 @@ private void onTriggerSuccess() {
      */
     private void onTriggerFailure(
             CheckpointTriggerRequest onCompletionPromise, Throwable throwable) 
{
-        final CheckpointException checkpointException =
-                getCheckpointException(
-                        CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, 
throwable);
-        onCompletionPromise.completeExceptionally(checkpointException);
-        onTriggerFailure((PendingCheckpoint) null, checkpointException);
+        Throwable rootThrowable = 
ExceptionUtils.stripCompletionException(throwable);

Review comment:
       It is actually not a root throwable. stripCompletionException only 
unwraps CompletionException. But what if IOException would be hidden deeper 
than one level? According to the task description, I suppose we interesting in 
any IOException at any cause level. So if I understand everything right 
`ExceptionUtils.findThrowable(IOException).isPresent()` is more correct 
construction here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -136,6 +135,8 @@ public void checkFailureCounter(CheckpointException 
exception, long checkpointId
                 // ignore
                 break;
 
+            case EXCEPTION:

Review comment:
       What is the reason for moving EXCEPTION out of ignore? I don't see any 
clue of this either in the ticket or in the tests.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -862,11 +862,19 @@ private void onTriggerSuccess() {
      */
     private void onTriggerFailure(
             CheckpointTriggerRequest onCompletionPromise, Throwable throwable) 
{
-        final CheckpointException checkpointException =
-                getCheckpointException(
-                        CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, 
throwable);
-        onCompletionPromise.completeExceptionally(checkpointException);
-        onTriggerFailure((PendingCheckpoint) null, checkpointException);
+        Throwable rootThrowable = 
ExceptionUtils.stripCompletionException(throwable);

Review comment:
       Also, why do you use unwrapped throwable only if it is IOException but 
if it is not you use the initial one? I think logic should be consistent here. 
If we know that this throwable can be wrapped by CompletionException we should 
unwrap it in both cases(Perhaps, it makes sense to move unwrapping into 
`getCheckpointException`). 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
##########
@@ -96,7 +96,7 @@ public void testTotalCountValue() {
         }
 
         // CHECKPOINT_DECLINED, CHECKPOINT_EXPIRED and 
CHECKPOINT_ASYNC_EXCEPTION
-        assertEquals(3, callback.getInvokeCounter());
+        assertEquals(5, callback.getInvokeCounter());

Review comment:
       Inconsistency between number and comment.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -862,11 +862,19 @@ private void onTriggerSuccess() {
      */
     private void onTriggerFailure(
             CheckpointTriggerRequest onCompletionPromise, Throwable throwable) 
{
-        final CheckpointException checkpointException =
-                getCheckpointException(
-                        CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, 
throwable);
-        onCompletionPromise.completeExceptionally(checkpointException);
-        onTriggerFailure((PendingCheckpoint) null, checkpointException);
+        Throwable rootThrowable = 
ExceptionUtils.stripCompletionException(throwable);
+        if (rootThrowable instanceof IOException) {
+            final CheckpointException checkpointException =
+                    
getCheckpointException(CheckpointFailureReason.IO_EXCEPTION, rootThrowable);
+            onCompletionPromise.completeExceptionally(checkpointException);
+            onTriggerFailure((PendingCheckpoint) null, throwable);
+        } else {
+            final CheckpointException checkpointException =
+                    getCheckpointException(
+                            
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable);
+            onCompletionPromise.completeExceptionally(checkpointException);
+            onTriggerFailure((PendingCheckpoint) null, checkpointException);

Review comment:
       I didn't get why `if` and `else` blocks are different(one passes 
throwable to onTriggerFailure while another passes checkpointException). As I 
understand you should only choose CheckpointFailureReason but another code 
should be the same. Or Did I understand something wrong?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
##########
@@ -81,6 +81,8 @@
 
     FINALIZE_CHECKPOINT_FAILURE(false, "Failure to finalize checkpoint."),
 
+    IO_EXCEPTION(false, "Trigger checkpoint failure."),

Review comment:
       Please fix the description. right now it is just copy-paste of 
`TRIGGER_CHECKPOINT_FAILURE`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -624,6 +625,36 @@ public void 
testTriggerAndDeclineCheckpointThenFailureManagerThrowsException()
         }
     }
 
+    @Test
+    public void testIOExceptionCheckpointExceedsTolerableFailureNumber() 
throws Exception {

Review comment:
       This test checks that the task is failed when it receives the new reason 
type but I don't see tests that check that your new code inside of 
`onTriggerFailure` and `onTriggerFailure` work. 
   As I can see, at least `testPeriodicSchedulingWithInactiveTasks`, 
`testTriggerCheckpointAfterCancel` and 
`testCheckpointAbortsIfTriggerTasksAreFinished`(and I sure many more) trigger 
somehow the exception. Perhaps, you can take a look how these tests works and 
you can write something similar. 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -892,9 +900,12 @@ private void onTriggerFailure(@Nullable PendingCheckpoint 
checkpoint, Throwable
                         job,
                         numUnsuccessful,
                         throwable);
-                final CheckpointException cause =
-                        getCheckpointException(
-                                
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable);
+                CheckpointFailureReason defaultReason =
+                        CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE;
+                if (throwable instanceof IOException) {

Review comment:
       The same here. Why are you so sure that IOException would not be wrapped 
by other exceptions?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to