Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149637696 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -58,99 +58,119 @@ import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.Executor; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.isOneOf; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; +import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TaskAsyncCallTest { - private static final int NUM_CALLS = 1000; + private static int numCalls; private static OneShotLatch awaitLatch; private static OneShotLatch triggerLatch; + private static List<ClassLoader> classLoaders; + @Before public void createQueuesAndActors() { + numCalls = 1000; + awaitLatch = new OneShotLatch(); triggerLatch = new OneShotLatch(); + + classLoaders = new ArrayList<>(); } // ------------------------------------------------------------------------ // Tests // ------------------------------------------------------------------------ - + @Test - public void testCheckpointCallsInOrder() { - try { - Task task = createTask(); - task.startTaskThread(); - - awaitLatch.await(); - - for (int i = 1; i <= NUM_CALLS; i++) { - task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forCheckpoint()); - } - - triggerLatch.await(); - - assertFalse(task.isCanceledOrFailed()); + public void testCheckpointCallsInOrder() throws Exception { + Task task = createTask(CheckpointsInOrderInvokable.class); + task.startTaskThread(); - ExecutionState currentState = task.getExecutionState(); - if (currentState != ExecutionState.RUNNING && currentState != ExecutionState.FINISHED) { - fail("Task should be RUNNING or FINISHED, but is " + currentState); - } - - task.cancelExecution(); - task.getExecutingThread().join(); + awaitLatch.await(); + + for (int i = 1; i <= numCalls; i++) { + task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forCheckpoint()); } - catch (Exception e) { --- End diff -- Yep, the diff on GitHub is a bit hard to read but I figured it out. ð
---