Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149703888 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -58,99 +59,144 @@ import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.Executor; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.isOneOf; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TaskAsyncCallTest { - private static final int NUM_CALLS = 1000; - + private static int numCalls; + + /** Triggered at the beginning of {@link CheckpointsInOrderInvokable#invoke()}. */ private static OneShotLatch awaitLatch; + + /** + * Triggered when {@link CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData, CheckpointOptions)} + * was called {@link #numCalls} times. + */ private static OneShotLatch triggerLatch; + private static final List<ClassLoader> classLoaders = new ArrayList<>(); + @Before public void createQueuesAndActors() { + numCalls = 1000; + awaitLatch = new OneShotLatch(); triggerLatch = new OneShotLatch(); + + classLoaders.clear(); } // ------------------------------------------------------------------------ // Tests // ------------------------------------------------------------------------ - + @Test - public void testCheckpointCallsInOrder() { - try { - Task task = createTask(); + public void testCheckpointCallsInOrder() throws Exception { + Task task = createTask(CheckpointsInOrderInvokable.class); + try (TaskCleaner ignored = new TaskCleaner(task)) { task.startTaskThread(); - + awaitLatch.await(); - - for (int i = 1; i <= NUM_CALLS; i++) { + + for (int i = 1; i <= numCalls; i++) { task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forCheckpoint()); } - + triggerLatch.await(); - + assertFalse(task.isCanceledOrFailed()); ExecutionState currentState = task.getExecutionState(); - if (currentState != ExecutionState.RUNNING && currentState != ExecutionState.FINISHED) { - fail("Task should be RUNNING or FINISHED, but is " + currentState); - } - - task.cancelExecution(); --- End diff -- I moved this to the `AutoCloseable` `TaskCleaner`.
---