XComp commented on a change in pull request #15832:
URL: https://github.com/apache/flink/pull/15832#discussion_r630818299
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
##########
@@ -160,8 +177,94 @@ public void testFailingCompletedCheckpointStoreAdd()
throws Exception {
.discardState();
}
+ @Test
+ public void testCleanupForGenericFailure() throws Exception {
+ testStoringFailureHandling(new FlinkRuntimeException("Expected
exception"), 1);
+ }
+
+ @Test
+ public void testCleanupOmissionForPossibleInconsistentStateException()
throws Exception {
+ testStoringFailureHandling(new PossibleInconsistentStateException(),
0);
+ }
+
+ private void testStoringFailureHandling(Exception failure, int
expectedCleanupCalls)
+ throws Exception {
+ final JobVertexID jobVertexID1 = new JobVertexID();
+
+ final ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID1)
+ .build();
+
+ final ExecutionVertex vertex =
graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
+ final ExecutionAttemptID attemptId =
vertex.getCurrentExecutionAttempt().getAttemptId();
+
+ final StandaloneCheckpointIDCounter checkpointIDCounter =
+ new StandaloneCheckpointIDCounter();
+
+ final ManuallyTriggeredScheduledExecutor
manuallyTriggeredScheduledExecutor =
+ new ManuallyTriggeredScheduledExecutor();
+
+ final CompletedCheckpointStore completedCheckpointStore =
+ new FailingCompletedCheckpointStore(
+ (checkpoint, ignoredCleaner, ignoredPostCleanCallback)
-> {
+ throw failure;
+ });
+
+ final AtomicInteger cleanupCallCount = new AtomicInteger(0);
+ final CheckpointCoordinator checkpointCoordinator =
+ new CheckpointCoordinatorBuilder()
+ .setExecutionGraph(graph)
+ .setCheckpointIDCounter(checkpointIDCounter)
+ .setCheckpointsCleaner(
+ new CheckpointsCleaner() {
+
+ private static final long serialVersionUID
=
+ 2029876992397573325L;
+
+ @Override
+ public void cleanCheckpointOnFailedStoring(
+ CompletedCheckpoint
completedCheckpoint,
+ Executor executor) {
+ cleanupCallCount.incrementAndGet();
+ super.cleanCheckpointOnFailedStoring(
+ completedCheckpoint, executor);
+ }
+ })
+ .setCompletedCheckpointStore(completedCheckpointStore)
+ .setTimer(manuallyTriggeredScheduledExecutor)
+ .build();
+
checkpointCoordinator.triggerSavepoint(tmpFolder.newFolder().getAbsolutePath());
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ try {
+ checkpointCoordinator.receiveAcknowledgeMessage(
+ new AcknowledgeCheckpoint(
+ graph.getJobID(), attemptId,
checkpointIDCounter.getLast()),
+ "unknown location");
+ fail("CheckpointException should have been thrown.");
+ } catch (CheckpointException e) {
+ assertThat(
+ e.getCheckpointFailureReason(),
+ is(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE));
+ }
+
+ assertThat(cleanupCallCount.get(), is(expectedCleanupCalls));
+ }
+
private static final class FailingCompletedCheckpointStore implements
CompletedCheckpointStore {
+ private final TriConsumerWithException<
+ CompletedCheckpoint, CheckpointsCleaner, Runnable,
Exception>
+ addCheckpointConsumer;
+
+ public FailingCompletedCheckpointStore(
+ TriConsumerWithException<
+ CompletedCheckpoint, CheckpointsCleaner,
Runnable, Exception>
+ addCheckpointConsumer) {
+ this.addCheckpointConsumer = addCheckpointConsumer;
Review comment:
You're right. It would be sufficient enough considering that
`FailingCompletedCheckpointStore` is only used internally and does not need
this extended flexibility. It was just a habit to generalize these kind of
builders. But there is no need to do that considering that it's not used
anywhere else. I'm gonna revert that change and go with your proposal.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]