XComp commented on a change in pull request #15832:
URL: https://github.com/apache/flink/pull/15832#discussion_r630818299



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
##########
@@ -160,8 +177,94 @@ public void testFailingCompletedCheckpointStoreAdd() 
throws Exception {
                 .discardState();
     }
 
+    @Test
+    public void testCleanupForGenericFailure() throws Exception {
+        testStoringFailureHandling(new FlinkRuntimeException("Expected 
exception"), 1);
+    }
+
+    @Test
+    public void testCleanupOmissionForPossibleInconsistentStateException() 
throws Exception {
+        testStoringFailureHandling(new PossibleInconsistentStateException(), 
0);
+    }
+
+    private void testStoringFailureHandling(Exception failure, int 
expectedCleanupCalls)
+            throws Exception {
+        final JobVertexID jobVertexID1 = new JobVertexID();
+
+        final ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID1)
+                        .build();
+
+        final ExecutionVertex vertex = 
graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
+        final ExecutionAttemptID attemptId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
+
+        final StandaloneCheckpointIDCounter checkpointIDCounter =
+                new StandaloneCheckpointIDCounter();
+
+        final ManuallyTriggeredScheduledExecutor 
manuallyTriggeredScheduledExecutor =
+                new ManuallyTriggeredScheduledExecutor();
+
+        final CompletedCheckpointStore completedCheckpointStore =
+                new FailingCompletedCheckpointStore(
+                        (checkpoint, ignoredCleaner, ignoredPostCleanCallback) 
-> {
+                            throw failure;
+                        });
+
+        final AtomicInteger cleanupCallCount = new AtomicInteger(0);
+        final CheckpointCoordinator checkpointCoordinator =
+                new CheckpointCoordinatorBuilder()
+                        .setExecutionGraph(graph)
+                        .setCheckpointIDCounter(checkpointIDCounter)
+                        .setCheckpointsCleaner(
+                                new CheckpointsCleaner() {
+
+                                    private static final long serialVersionUID 
=
+                                            2029876992397573325L;
+
+                                    @Override
+                                    public void cleanCheckpointOnFailedStoring(
+                                            CompletedCheckpoint 
completedCheckpoint,
+                                            Executor executor) {
+                                        cleanupCallCount.incrementAndGet();
+                                        super.cleanCheckpointOnFailedStoring(
+                                                completedCheckpoint, executor);
+                                    }
+                                })
+                        .setCompletedCheckpointStore(completedCheckpointStore)
+                        .setTimer(manuallyTriggeredScheduledExecutor)
+                        .build();
+        
checkpointCoordinator.triggerSavepoint(tmpFolder.newFolder().getAbsolutePath());
+        manuallyTriggeredScheduledExecutor.triggerAll();
+
+        try {
+            checkpointCoordinator.receiveAcknowledgeMessage(
+                    new AcknowledgeCheckpoint(
+                            graph.getJobID(), attemptId, 
checkpointIDCounter.getLast()),
+                    "unknown location");
+            fail("CheckpointException should have been thrown.");
+        } catch (CheckpointException e) {
+            assertThat(
+                    e.getCheckpointFailureReason(),
+                    is(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE));
+        }
+
+        assertThat(cleanupCallCount.get(), is(expectedCleanupCalls));
+    }
+
     private static final class FailingCompletedCheckpointStore implements 
CompletedCheckpointStore {
 
+        private final TriConsumerWithException<
+                        CompletedCheckpoint, CheckpointsCleaner, Runnable, 
Exception>
+                addCheckpointConsumer;
+
+        public FailingCompletedCheckpointStore(
+                TriConsumerWithException<
+                                CompletedCheckpoint, CheckpointsCleaner, 
Runnable, Exception>
+                        addCheckpointConsumer) {
+            this.addCheckpointConsumer = addCheckpointConsumer;

Review comment:
       You're right. It would be sufficient enough considering that 
`FailingCompletedCheckpointStore` is only used internally and does not need 
this extended flexibility. It was just a habit to generalize these kind of 
builders. But there is no need to do that considering that it's not used 
anywhere else. I'm gonna revert that change and go with your proposal.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to