yanghua commented on a change in pull request #7571: [FLINK-10724] Refactor 
failure handling in check point coordinator
URL: https://github.com/apache/flink/pull/7571#discussion_r250843974
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ##########
 @@ -505,6 +511,135 @@ public void testTriggerAndDeclineCheckpointComplex() {
                }
        }
 
+       /**
+        * This test triggers two checkpoints and then sends a decline message 
which contains a task
+        * failure exception for the first checkpoint. This should discard the 
first checkpoint while
+        * not triggering a new checkpoint because a later checkpoint is 
already in progress.
+        */
+       @Test
+       public void testTriggerAndDeclineWithExecutionFailure() {
+               try {
+                       final JobID jid = new JobID();
+                       final long timestamp = System.currentTimeMillis();
+
+                       // create some mock Execution vertices that receive the 
checkpoint trigger messages
+                       final ExecutionAttemptID attemptID1 = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID attemptID2 = new 
ExecutionAttemptID();
+                       ExecutionVertex vertex1 = 
mockExecutionVertex(attemptID1);
+                       ExecutionVertex vertex2 = 
mockExecutionVertex(attemptID2);
+
+                       // set up the coordinator and validate the initial state
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               new MemoryStateBackend(),
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               true);
+
+                       assertEquals(0, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+                       assertEquals(0, coord.getNumScheduledTasks());
+
+                       // trigger the first checkpoint. this should succeed
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+                       // trigger second checkpoint, should also succeed
+                       assertTrue(coord.triggerCheckpoint(timestamp + 2, 
false));
+
+                       // validate that we have a pending checkpoint
+                       assertEquals(2, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+                       assertEquals(2, coord.getNumScheduledTasks());
+
+                       Iterator<Map.Entry<Long, PendingCheckpoint>> it = 
coord.getPendingCheckpoints().entrySet().iterator();
+                       long checkpoint1Id = it.next().getKey();
+                       long checkpoint2Id = it.next().getKey();
+                       PendingCheckpoint checkpoint1 = 
coord.getPendingCheckpoints().get(checkpoint1Id);
+                       PendingCheckpoint checkpoint2 = 
coord.getPendingCheckpoints().get(checkpoint2Id);
+
+                       assertNotNull(checkpoint1);
+                       assertEquals(checkpoint1Id, 
checkpoint1.getCheckpointId());
+                       assertEquals(timestamp, 
checkpoint1.getCheckpointTimestamp());
+                       assertEquals(jid, checkpoint1.getJobId());
+                       assertEquals(2, 
checkpoint1.getNumberOfNonAcknowledgedTasks());
+                       assertEquals(0, 
checkpoint1.getNumberOfAcknowledgedTasks());
+                       assertEquals(0, checkpoint1.getOperatorStates().size());
+                       assertFalse(checkpoint1.isDiscarded());
+                       assertFalse(checkpoint1.isFullyAcknowledged());
+
+                       assertNotNull(checkpoint2);
+                       assertEquals(checkpoint2Id, 
checkpoint2.getCheckpointId());
+                       assertEquals(timestamp + 2, 
checkpoint2.getCheckpointTimestamp());
+                       assertEquals(jid, checkpoint2.getJobId());
+                       assertEquals(2, 
checkpoint2.getNumberOfNonAcknowledgedTasks());
+                       assertEquals(0, 
checkpoint2.getNumberOfAcknowledgedTasks());
+                       assertEquals(0, checkpoint2.getOperatorStates().size());
+                       assertFalse(checkpoint2.isDiscarded());
+                       assertFalse(checkpoint2.isFullyAcknowledged());
+
+                       // check that the vertices received the trigger 
checkpoint message
+                       {
+                               verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp), 
any(CheckpointOptions.class));
+                               verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp), 
any(CheckpointOptions.class));
+                       }
+
+                       // check that the vertices received the trigger 
checkpoint message for the second checkpoint
+                       {
+                               verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2), 
any(CheckpointOptions.class));
+                               verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2), 
any(CheckpointOptions.class));
+                       }
+
+                       // decline checkpoint from one of the tasks, this 
should cancel the checkpoint
+                       SerializedThrowable failedReason = new 
SerializedThrowable(new Throwable("Execution failed."));
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint1Id, failedReason));
+                       assertTrue(checkpoint1.isDiscarded());
+
+                       verify(vertex1, times(1)).fail(any(Throwable.class));
+
+                       // validate that we have only one pending checkpoint 
left
 
 Review comment:
   > The logic here seems buggy, we trigged two checkpoint --> declined one 
checkpoint --> validate that there only one pending checkpoint left.
   > Maybe you should trigger another checkpoint before validate.
   
   You have said `we triggered two checkpoints`. What does this (`Maybe you 
should trigger another checkpoint before validate.`) mean?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to