pnowojski commented on a change in pull request #16637:
URL: https://github.com/apache/flink/pull/16637#discussion_r688922304



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -626,6 +627,194 @@ public void 
testTriggerAndDeclineCheckpointThenFailureManagerThrowsException()
         }
     }
 
+    @Test
+    public void testIOExceptionCheckpointExceedsTolerableFailureNumber() 
throws Exception {
+        // create some mock Execution vertices that receive the checkpoint 
trigger messages
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(new JobVertexID())
+                        .addJobVertex(new JobVertexID())
+                        .build();
+
+        final String errorMsg = "Exceeded checkpoint failure tolerance 
number!";
+        CheckpointFailureManager checkpointFailureManager = 
getCheckpointFailureManager(errorMsg);
+        CheckpointCoordinator checkpointCoordinator =
+                getCheckpointCoordinator(graph, checkpointFailureManager);
+
+        try {
+            checkpointCoordinator.triggerCheckpoint(false);
+            manuallyTriggeredScheduledExecutor.triggerAll();
+
+            checkpointCoordinator.abortPendingCheckpoints(new 
CheckpointException(IO_EXCEPTION));
+
+            fail("Test failed.");
+        } catch (Exception e) {
+            // expected
+            assertTrue(e instanceof RuntimeException);
+            assertEquals(errorMsg, e.getMessage());
+        } finally {
+            checkpointCoordinator.shutdown();
+        }
+    }
+
+    @Test
+    public void testIOExceptionForPeriodicSchedulingWithInactiveTasks() {
+        TestingIOExceptionCheckpointIDCounter idCounter =
+                new TestingIOExceptionCheckpointIDCounter();
+
+        try {
+            JobVertexID jobVertexID1 = new JobVertexID();
+
+            ExecutionGraph graph =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                            .addJobVertex(jobVertexID1)
+                            .setTransitToRunning(false)
+                            .build();
+
+            ExecutionVertex vertex1 = 
graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
+
+            CheckpointCoordinatorConfiguration chkConfig =
+                    new CheckpointCoordinatorConfiguration
+                                    
.CheckpointCoordinatorConfigurationBuilder()
+                            .setCheckpointInterval(10) // periodic interval is 
10 ms
+                            .setCheckpointTimeout(200000) // timeout is very 
long (200 s)
+                            .setMinPauseBetweenCheckpoints(0) // no extra delay
+                            .setMaxConcurrentCheckpoints(2) // max two 
concurrent checkpoints
+                            .build();
+            CheckpointCoordinator checkpointCoordinator =
+                    new CheckpointCoordinatorBuilder()
+                            .setExecutionGraph(graph)
+                            .setCheckpointCoordinatorConfiguration(chkConfig)
+                            .setCompletedCheckpointStore(new 
StandaloneCompletedCheckpointStore(2))
+                            .setTimer(manuallyTriggeredScheduledExecutor)
+                            .setCheckpointIDCounter(idCounter)
+                            .build();
+            idCounter.setOwner(checkpointCoordinator);
+
+            checkpointCoordinator.startCheckpointScheduler();
+
+            manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
+            manuallyTriggeredScheduledExecutor.triggerAll();
+            // no checkpoint should have started so far
+            assertEquals(0, 
checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+            // now move the state to RUNNING
+            
vertex1.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
+
+            final CompletableFuture<CompletedCheckpoint> onCompletionPromise =
+                    checkpointCoordinator.triggerCheckpoint(
+                            CheckpointProperties.forCheckpoint(
+                                    
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+                            null,
+                            true);
+            manuallyTriggeredScheduledExecutor.triggerAll();
+
+            try {
+                onCompletionPromise.get();
+                fail("should not trigger periodic checkpoint after IOException 
occurred.");
+            } catch (Exception e) {
+                final Optional<CheckpointException> 
checkpointExceptionOptional =
+                        ExceptionUtils.findThrowable(e, 
CheckpointException.class);
+                assertTrue(checkpointExceptionOptional.isPresent());
+                assertEquals(
+                        IO_EXCEPTION,
+                        
checkpointExceptionOptional.get().getCheckpointFailureReason());
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+    /** Tests that do not trigger checkpoint when IOException occurred. */
+    @Test
+    public void testTriggerCheckpointAfterIOException() throws Exception {

Review comment:
       This test duplicates code with `testTriggerCheckpointAfterCancel`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -626,6 +627,194 @@ public void 
testTriggerAndDeclineCheckpointThenFailureManagerThrowsException()
         }
     }
 
+    @Test
+    public void testIOExceptionCheckpointExceedsTolerableFailureNumber() 
throws Exception {
+        // create some mock Execution vertices that receive the checkpoint 
trigger messages
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(new JobVertexID())
+                        .addJobVertex(new JobVertexID())
+                        .build();
+
+        final String errorMsg = "Exceeded checkpoint failure tolerance 
number!";
+        CheckpointFailureManager checkpointFailureManager = 
getCheckpointFailureManager(errorMsg);
+        CheckpointCoordinator checkpointCoordinator =
+                getCheckpointCoordinator(graph, checkpointFailureManager);
+
+        try {
+            checkpointCoordinator.triggerCheckpoint(false);
+            manuallyTriggeredScheduledExecutor.triggerAll();
+
+            checkpointCoordinator.abortPendingCheckpoints(new 
CheckpointException(IO_EXCEPTION));
+
+            fail("Test failed.");
+        } catch (Exception e) {
+            // expected
+            assertTrue(e instanceof RuntimeException);
+            assertEquals(errorMsg, e.getMessage());
+        } finally {
+            checkpointCoordinator.shutdown();
+        }
+    }
+
+    @Test
+    public void testIOExceptionForPeriodicSchedulingWithInactiveTasks() {

Review comment:
       This test duplicates a lot of code from 
`testPeriodicSchedulingWithInactiveTasks()`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -626,6 +627,194 @@ public void 
testTriggerAndDeclineCheckpointThenFailureManagerThrowsException()
         }
     }
 
+    @Test
+    public void testIOExceptionCheckpointExceedsTolerableFailureNumber() 
throws Exception {
+        // create some mock Execution vertices that receive the checkpoint 
trigger messages
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(new JobVertexID())
+                        .addJobVertex(new JobVertexID())
+                        .build();
+
+        final String errorMsg = "Exceeded checkpoint failure tolerance 
number!";
+        CheckpointFailureManager checkpointFailureManager = 
getCheckpointFailureManager(errorMsg);
+        CheckpointCoordinator checkpointCoordinator =
+                getCheckpointCoordinator(graph, checkpointFailureManager);
+
+        try {
+            checkpointCoordinator.triggerCheckpoint(false);
+            manuallyTriggeredScheduledExecutor.triggerAll();
+
+            checkpointCoordinator.abortPendingCheckpoints(new 
CheckpointException(IO_EXCEPTION));
+
+            fail("Test failed.");
+        } catch (Exception e) {
+            // expected
+            assertTrue(e instanceof RuntimeException);
+            assertEquals(errorMsg, e.getMessage());

Review comment:
       please never hide the original exception. This code should look like 
this:
   ```
   if (!ExceptionUtils.findThrowable(e, errorMsg).isPresent()) {
     throw e;
   }
   ```
   
   This way if an unexpected error happens we can immediately see what was the 
actual exception and it's stack trace.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to