StefanRRichter commented on a change in pull request #8322: [FLINK-12364] 
Introduce a CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293543043
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ##########
 @@ -288,6 +312,75 @@ public void 
testCheckpointAbortsIfAckTasksAreNotExecuted() {
                }
        }
 
+       @Test
+       public void 
testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() {
+               final JobID jid = new JobID();
+               final long timestamp = System.currentTimeMillis();
+
+               // create some mock Execution vertices that receive the 
checkpoint trigger messages
+               final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+               final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+               ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+               ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
+
+               CheckpointFailureManager checkpointFailureManager = new 
CheckpointFailureManager(0, () -> {
+                       throw new RuntimeException("Exceeded checkpoint failure 
tolerance number!");
+               });
+
+               // set up the coordinator
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
+                       600000,
+                       600000,
+                       0,
+                       Integer.MAX_VALUE,
+                       
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       jid,
+                       chkConfig,
+                       new ExecutionVertex[] { vertex1, vertex2 },
+                       new ExecutionVertex[] { vertex1, vertex2 },
+                       new ExecutionVertex[] { vertex1, vertex2 },
+                       new StandaloneCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(1),
+                       new MemoryStateBackend(),
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       checkpointFailureManager);
+
+               try {
+                       // trigger the checkpoint. this should succeed
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+                       long checkpointId = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+                       PendingCheckpoint checkpoint = 
coord.getPendingCheckpoints().get(checkpointId);
+
+                       // acknowledge from one of the tasks
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId), 
TASK_MANAGER_LOCATION_INFO);
+                       assertFalse(checkpoint.isDiscarded());
+                       assertFalse(checkpoint.isFullyAcknowledged());
+
+                       // decline checkpoint from the other task
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+
+                       fail("Test failed.");
+               }
+               catch (Exception e) {
+                       //expected
+                       assertTrue(e instanceof RuntimeException);
+                       assertEquals("Exceeded checkpoint failure tolerance 
number!", e.getMessage());
 
 Review comment:
   If you have to do a check like this, at least better put the string into a 
variable instead of repeating it. It probably is even better to create a 
particular instance of runtime exception, put that in a variable and check for 
referential equality.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to