pnowojski commented on a change in pull request #16637:
URL: https://github.com/apache/flink/pull/16637#discussion_r688922304
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -626,6 +627,194 @@ public void
testTriggerAndDeclineCheckpointThenFailureManagerThrowsException()
}
}
+ @Test
+ public void testIOExceptionCheckpointExceedsTolerableFailureNumber()
throws Exception {
+ // create some mock Execution vertices that receive the checkpoint
trigger messages
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(new JobVertexID())
+ .addJobVertex(new JobVertexID())
+ .build();
+
+ final String errorMsg = "Exceeded checkpoint failure tolerance
number!";
+ CheckpointFailureManager checkpointFailureManager =
getCheckpointFailureManager(errorMsg);
+ CheckpointCoordinator checkpointCoordinator =
+ getCheckpointCoordinator(graph, checkpointFailureManager);
+
+ try {
+ checkpointCoordinator.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ checkpointCoordinator.abortPendingCheckpoints(new
CheckpointException(IO_EXCEPTION));
+
+ fail("Test failed.");
+ } catch (Exception e) {
+ // expected
+ assertTrue(e instanceof RuntimeException);
+ assertEquals(errorMsg, e.getMessage());
+ } finally {
+ checkpointCoordinator.shutdown();
+ }
+ }
+
+ @Test
+ public void testIOExceptionForPeriodicSchedulingWithInactiveTasks() {
+ TestingIOExceptionCheckpointIDCounter idCounter =
+ new TestingIOExceptionCheckpointIDCounter();
+
+ try {
+ JobVertexID jobVertexID1 = new JobVertexID();
+
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID1)
+ .setTransitToRunning(false)
+ .build();
+
+ ExecutionVertex vertex1 =
graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
+
+ CheckpointCoordinatorConfiguration chkConfig =
+ new CheckpointCoordinatorConfiguration
+
.CheckpointCoordinatorConfigurationBuilder()
+ .setCheckpointInterval(10) // periodic interval is
10 ms
+ .setCheckpointTimeout(200000) // timeout is very
long (200 s)
+ .setMinPauseBetweenCheckpoints(0) // no extra delay
+ .setMaxConcurrentCheckpoints(2) // max two
concurrent checkpoints
+ .build();
+ CheckpointCoordinator checkpointCoordinator =
+ new CheckpointCoordinatorBuilder()
+ .setExecutionGraph(graph)
+ .setCheckpointCoordinatorConfiguration(chkConfig)
+ .setCompletedCheckpointStore(new
StandaloneCompletedCheckpointStore(2))
+ .setTimer(manuallyTriggeredScheduledExecutor)
+ .setCheckpointIDCounter(idCounter)
+ .build();
+ idCounter.setOwner(checkpointCoordinator);
+
+ checkpointCoordinator.startCheckpointScheduler();
+
+ manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
+ manuallyTriggeredScheduledExecutor.triggerAll();
+ // no checkpoint should have started so far
+ assertEquals(0,
checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+ // now move the state to RUNNING
+
vertex1.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
+
+ final CompletableFuture<CompletedCheckpoint> onCompletionPromise =
+ checkpointCoordinator.triggerCheckpoint(
+ CheckpointProperties.forCheckpoint(
+
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ null,
+ true);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ try {
+ onCompletionPromise.get();
+ fail("should not trigger periodic checkpoint after IOException
occurred.");
+ } catch (Exception e) {
+ final Optional<CheckpointException>
checkpointExceptionOptional =
+ ExceptionUtils.findThrowable(e,
CheckpointException.class);
+ assertTrue(checkpointExceptionOptional.isPresent());
+ assertEquals(
+ IO_EXCEPTION,
+
checkpointExceptionOptional.get().getCheckpointFailureReason());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /** Tests that do not trigger checkpoint when IOException occurred. */
+ @Test
+ public void testTriggerCheckpointAfterIOException() throws Exception {
Review comment:
This test duplicates code with `testTriggerCheckpointAfterCancel`
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -626,6 +627,194 @@ public void
testTriggerAndDeclineCheckpointThenFailureManagerThrowsException()
}
}
+ @Test
+ public void testIOExceptionCheckpointExceedsTolerableFailureNumber()
throws Exception {
+ // create some mock Execution vertices that receive the checkpoint
trigger messages
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(new JobVertexID())
+ .addJobVertex(new JobVertexID())
+ .build();
+
+ final String errorMsg = "Exceeded checkpoint failure tolerance
number!";
+ CheckpointFailureManager checkpointFailureManager =
getCheckpointFailureManager(errorMsg);
+ CheckpointCoordinator checkpointCoordinator =
+ getCheckpointCoordinator(graph, checkpointFailureManager);
+
+ try {
+ checkpointCoordinator.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ checkpointCoordinator.abortPendingCheckpoints(new
CheckpointException(IO_EXCEPTION));
+
+ fail("Test failed.");
+ } catch (Exception e) {
+ // expected
+ assertTrue(e instanceof RuntimeException);
+ assertEquals(errorMsg, e.getMessage());
+ } finally {
+ checkpointCoordinator.shutdown();
+ }
+ }
+
+ @Test
+ public void testIOExceptionForPeriodicSchedulingWithInactiveTasks() {
Review comment:
This test duplicates a lot of code from
`testPeriodicSchedulingWithInactiveTasks()`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -626,6 +627,194 @@ public void
testTriggerAndDeclineCheckpointThenFailureManagerThrowsException()
}
}
+ @Test
+ public void testIOExceptionCheckpointExceedsTolerableFailureNumber()
throws Exception {
+ // create some mock Execution vertices that receive the checkpoint
trigger messages
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(new JobVertexID())
+ .addJobVertex(new JobVertexID())
+ .build();
+
+ final String errorMsg = "Exceeded checkpoint failure tolerance
number!";
+ CheckpointFailureManager checkpointFailureManager =
getCheckpointFailureManager(errorMsg);
+ CheckpointCoordinator checkpointCoordinator =
+ getCheckpointCoordinator(graph, checkpointFailureManager);
+
+ try {
+ checkpointCoordinator.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ checkpointCoordinator.abortPendingCheckpoints(new
CheckpointException(IO_EXCEPTION));
+
+ fail("Test failed.");
+ } catch (Exception e) {
+ // expected
+ assertTrue(e instanceof RuntimeException);
+ assertEquals(errorMsg, e.getMessage());
Review comment:
please never hide the original exception. This code should look like
this:
```
if (!ExceptionUtils.findThrowable(e, errorMsg).isPresent()) {
throw e;
}
```
This way if an unexpected error happens we can immediately see what was the
actual exception and it's stack trace.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]